Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharded compaction #3537

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio)
* [ENHANCEMENT] Add nestedSetLeft/Right/Parent instrinsics to TraceQL. [#3497](https://github.com/grafana/tempo/pull/3497) (@joe-elliott)
* [ENHANCEMENT] Add tenant to frontend job cache key. [#3527](https://github.com/grafana/tempo/pull/3527) (@joe-elliott)
* [ENHANCEMENT] Add new sharding compactor [#3537](https://github.com/grafana/tempo/pull/3537) (@mdisibio)
* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio)
* [BUGFIX] Fix metrics query results when series contain empty strings or nil values [#3429](https://github.com/grafana/tempo/issues/3429) (@mdisibio)
* [BUGFIX] Fix metrics query duration check, add per-tenant override for max metrics query duration [#3479](https://github.com/grafana/tempo/issues/3479) (@mdisibio)
Expand Down
6 changes: 6 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,9 @@ compactor:
# Note: The default will be used if the value is set to 0.
[compaction_cycle: <duration>]

# Optional. The number of shards to split compacted blocks into. Set to 2 or higher to enable. Default is 0 (disabled).
[shards: <int>]

# Optional. Amount of data to buffer from input blocks. Default is 5 MiB.
[v2_in_buffer_bytes: <int>]

Expand Down Expand Up @@ -1292,6 +1295,9 @@ overrides:
# Per-user compaction window. If this value is set to 0 (default),
# then block_retention in the compactor configuration is used.
[compaction_window: <duration> | default = 0s]
# Per-user compaction shard count. If this value is set to 0 (default),
# then shards in the compactor configuration is used.
[shards: <int> | default = 0]

# Metrics-generator related overrides
metrics_generator:
Expand Down
1 change: 1 addition & 0 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ compactor:
retention_concurrency: 10
max_time_per_tenant: 5m0s
compaction_cycle: 30s
shards: 0
override_ring_key: compactor
ingester:
lifecycler:
Expand Down
4 changes: 4 additions & 0 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ func (c *Compactor) MaxCompactionRangeForTenant(tenantID string) time.Duration {
return c.overrides.MaxCompactionRange(tenantID)
}

func (c *Compactor) ShardsForTenant(tenantID string) int {
return c.overrides.CompactionShards(tenantID)
}

func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}
Expand Down
1 change: 1 addition & 0 deletions modules/overrides/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type CompactionOverrides struct {
// Compactor enforced overrides.
BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"`
CompactionWindow model.Duration `yaml:"compaction_window,omitempty" json:"compaction_window,omitempty"`
Shards int `yaml:"shards,omitempty" json:"shards,omitempty"`
}

type GlobalOverrides struct {
Expand Down
3 changes: 3 additions & 0 deletions modules/overrides/config_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (c *Overrides) toLegacy() LegacyOverrides {

BlockRetention: c.Compaction.BlockRetention,
CompactionWindow: c.Compaction.CompactionWindow,
CompactionShards: c.Compaction.Shards,

MaxBytesPerTagValuesQuery: c.Read.MaxBytesPerTagValuesQuery,
MaxBlocksPerTagValuesQuery: c.Read.MaxBlocksPerTagValuesQuery,
Expand Down Expand Up @@ -113,6 +114,7 @@ type LegacyOverrides struct {
// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
CompactionWindow model.Duration `yaml:"compaction_window" json:"compaction_window"`
CompactionShards int `yaml:"compaction_shards" json:"compaction_shards"`

// Querier and Ingester enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`
Expand Down Expand Up @@ -151,6 +153,7 @@ func (l *LegacyOverrides) toNewLimits() Overrides {
Compaction: CompactionOverrides{
BlockRetention: l.BlockRetention,
CompactionWindow: l.CompactionWindow,
Shards: l.CompactionShards,
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
},
MetricsGenerator: MetricsGeneratorOverrides{
RingSize: l.MetricsGeneratorRingSize,
Expand Down
1 change: 1 addition & 0 deletions modules/overrides/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Interface interface {
MaxMetricsDuration(userID string) time.Duration
DedicatedColumns(userID string) backend.DedicatedColumns
UnsafeQueryHints(userID string) bool
CompactionShards(userID string) int

// Management API
WriteStatusRuntimeConfig(w io.Writer, r *http.Request) error
Expand Down
4 changes: 4 additions & 0 deletions modules/overrides/runtime_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ func (o *runtimeConfigOverridesManager) MaxCompactionRange(userID string) time.D
return time.Duration(o.getOverridesForUser(userID).Compaction.CompactionWindow)
}

func (o *runtimeConfigOverridesManager) CompactionShards(userID string) int {
return o.getOverridesForUser(userID).Compaction.Shards
}

// IngestionRateLimitBytes is the number of spans per second allowed for this tenant.
func (o *runtimeConfigOverridesManager) IngestionRateLimitBytes(userID string) float64 {
return float64(o.getOverridesForUser(userID).Ingestion.RateLimitBytes)
Expand Down
24 changes: 17 additions & 7 deletions modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util/log"
"github.com/grafana/tempo/pkg/util/traceidboundary"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -64,24 +65,33 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque
return nil, err
}

// Get blocks that overlap this time range
checkShard := func(min, max []byte) bool {
return true
}
if req.ShardCount > 1 {
_, checkShard = traceidboundary.Funcs(req.ShardID, req.ShardCount)
}

// Get blocks that overlap this time range and shard
metas := q.store.BlockMetas(tenantID)
withinTimeRange := metas[:0]
matchingBlocks := metas[:0]
for _, m := range metas {
if m.StartTime.UnixNano() <= int64(req.End) && m.EndTime.UnixNano() > int64(req.Start) {
withinTimeRange = append(withinTimeRange, m)
if m.StartTime.UnixNano() <= int64(req.End) &&
m.EndTime.UnixNano() > int64(req.Start) &&
checkShard(m.MinID, m.MaxID) {
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
matchingBlocks = append(matchingBlocks, m)
}
}

if len(withinTimeRange) == 0 {
if len(matchingBlocks) == 0 {
return nil, nil
}

unsafe := q.limits.UnsafeQueryHints(tenantID)

// Optimization
// If there's only 1 block then dedupe not needed.
dedupe := len(withinTimeRange) > 1
dedupe := len(matchingBlocks) > 1

expr, err := traceql.Parse(req.Query)
if err != nil {
Expand All @@ -106,7 +116,7 @@ func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeReque
wg := boundedwaitgroup.New(uint(concurrency))
jobErr := atomic.Error{}

for _, m := range withinTimeRange {
for _, m := range matchingBlocks {
// If a job errored then quit immediately.
if err := jobErr.Load(); err != nil {
return nil, err
Expand Down
38 changes: 28 additions & 10 deletions pkg/util/traceidboundary/traceidboundary.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ type Boundary struct {
Min, Max []byte
}

var _8byteRegions = []struct {
min, max uint64
}{
{0x0000000000000000, 0x00FFFFFFFFFFFFFF}, // Region with upper byte = 0
{0x0100000000000000, 0x0FFFFFFFFFFFFFFF}, // Region with upper nibble = 0
{0x1000000000000000, 0x7FFFFFFFFFFFFFFF}, // Region for 63-bit IDs (upper bit = 0)
{0x8000000000000000, 0xFFFFFFFFFFFFFFFF}, // Region for true 64-bit IDs
}

// Pairs returns the boundaries that match trace IDs in that shard. Internally this is
// similar to how queriers divide the block ID-space, but here it's trace IDs instead.
// The inputs are 1-based because it seems more readable: shard 1 of 10. Most boundaries
Expand Down Expand Up @@ -84,16 +93,7 @@ func Funcs(shard, of uint32) (testSingle func([]byte) bool, testRange func([]byt
func complicatedShardingFor8ByteIDs(shard, of uint32) []Boundary {
var results []Boundary

regions := []struct {
min, max uint64
}{
{0x0000000000000000, 0x00FFFFFFFFFFFFFF}, // Region with upper byte = 0
{0x0100000000000000, 0x0FFFFFFFFFFFFFFF}, // Region with upper nibble = 0
{0x1000000000000000, 0x7FFFFFFFFFFFFFFF}, // Region for 63-bit IDs (upper bit = 0)
{0x8000000000000000, 0xFFFFFFFFFFFFFFFF}, // Region for true 64-bit IDs
}

for _, r := range regions {
for _, r := range _8byteRegions {
b := bounds(of, r.min, r.max, 8)
results = append(results, Boundary{
Min: b[shard-1],
Expand Down Expand Up @@ -134,3 +134,21 @@ func bounds(shards uint32, min, max uint64, dest int) [][]byte {

return bounds
}

func All(of uint32) [][]byte {
var boundaries [][]byte

for _, r := range _8byteRegions {
b := bounds(of, r.min, r.max, 8)
boundaries = append(boundaries, b[1:]...) // Drop the starting value, we just need the upper cutoffs
}

b := bounds(of, 0, math.MaxUint64, 0)

// Adjust max to be full 16-byte max
b[of] = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}

boundaries = append(boundaries, b[1:]...) // Drop the starting value, we just need the upper cutoffs

return boundaries
}
41 changes: 32 additions & 9 deletions tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"time"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
)

// CompactionBlockSelector is an interface for different algorithms to pick suitable blocks for compaction
type CompactionBlockSelector interface {
BlocksToCompact() ([]*backend.BlockMeta, string)
BlocksToCompact() common.CompactionRound
}

const (
Expand Down Expand Up @@ -43,7 +44,28 @@ type timeWindowBlockEntry struct {
hash string // hash string used for sharding ownership, preserves backwards compatibility
}

var _ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil)
type timeWindowCompaction struct {
blocks []*backend.BlockMeta
hash string
maxCompactionObjects int
}

func (c *timeWindowCompaction) Blocks() []*backend.BlockMeta {
return c.blocks
}

func (c *timeWindowCompaction) Ownership() string {
return c.hash
}

func (c *timeWindowCompaction) CutBlock(currBlock *backend.BlockMeta, _ common.ID) bool {
return c.maxCompactionObjects > 0 && currBlock.TotalObjects >= c.maxCompactionObjects
}

var (
_ (common.CompactionRound) = (*timeWindowCompaction)(nil)
_ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil)
)

func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, maxBlockBytes uint64, minInputBlocks, maxInputBlocks int) CompactionBlockSelector {
twbs := &timeWindowBlockSelector{
Expand Down Expand Up @@ -116,7 +138,7 @@ func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRan
return twbs
}

func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*backend.BlockMeta, string) {
func (twbs *timeWindowBlockSelector) BlocksToCompact() common.CompactionRound {
for len(twbs.entries) > 0 {
var chosen []timeWindowBlockEntry

Expand Down Expand Up @@ -149,16 +171,17 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*backend.BlockMeta, st

// did we find enough blocks?
if len(chosen) >= twbs.MinInputBlocks {

compactBlocks := make([]*backend.BlockMeta, 0)
res := timeWindowCompaction{
hash: chosen[0].hash,
maxCompactionObjects: twbs.MaxCompactionObjects,
}
for _, e := range chosen {
compactBlocks = append(compactBlocks, e.meta)
res.blocks = append(res.blocks, e.meta)
}

return compactBlocks, chosen[0].hash
return &res
}
}
return nil, ""
return &timeWindowCompaction{}
}

func totalObjects(entries []timeWindowBlockEntry) int {
Expand Down
12 changes: 6 additions & 6 deletions tempodb/compaction_block_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,13 +786,13 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {

selector := newTimeWindowBlockSelector(tt.blocklist, time.Second, 100, maxSize, min, max)

actual, hash := selector.BlocksToCompact()
assert.Equal(t, tt.expected, actual)
assert.Equal(t, tt.expectedHash, hash)
actual := selector.BlocksToCompact()
assert.Equal(t, tt.expected, actual.Blocks())
assert.Equal(t, tt.expectedHash, actual.Ownership())

actual, hash = selector.BlocksToCompact()
assert.Equal(t, tt.expectedSecond, actual)
assert.Equal(t, tt.expectedHash2, hash)
actual = selector.BlocksToCompact()
assert.Equal(t, tt.expectedSecond, actual.Blocks())
assert.Equal(t, tt.expectedHash2, actual.Ownership())
})
}
}
Loading
Loading