Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction multi-threading #754

Merged
merged 12 commits into from
Jun 15, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## main / unreleased

* [FEATURE] Added the ability to hedge requests with all backends [#750](https://github.com/grafana/tempo/pull/750) (@joe-elliott)
* [ENHANCEMENT] Performance: improve compaction speed with concurrent reads and writes [#754](https://github.com/grafana/tempo/pull/754) (@mdisibio)
* [ENHANCEMENT] Improve readability of cpu and memory metrics on operational dashboard [#764](https://github.com/grafana/tempo/pull/764) (@bboreham)

## v1.0.1
Expand Down
9 changes: 6 additions & 3 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,24 @@ compactor:
# Optional. Blocks in this time window will be compacted together. Default is 4h.
[compaction_window: <duration>]

# Optional. Amount of data to buffer from input blocks. Default is 10 MB
# Optional. Amount of data to buffer from input blocks. Default is 10 MB.
[chunk_size_bytes: <int>]

# Optional. Flush data to backend when buffer is this large. Default is 30 MB
# Optional. Flush data to backend when buffer is this large. Default is 30 MB.
[flush_size_bytes: <int>]

# Optional. Maximum number of traces in a compacted block. Default is 6 million.
# WARNING: Deprecated. Use max_block_bytes instead.
[max_compaction_objects: <int>]

# Optional. Maximum size of a compacted block in bytes. Default is 100 GB
# Optional. Maximum size of a compacted block in bytes. Default is 100 GB.
[max_block_bytes: <int>]

# Optional. Number of tenants to process in parallel during retention. Default is 10.
[retention_concurrency: <int>]

# Optional. Number of traces to buffer in memory during compaction. Increasing may improve performance but will also increase memory usage. Default is 1000.
[iterator_buffer_size: <int>]
```

## Storage
Expand Down
1 change: 1 addition & 0 deletions docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ compactor:
block_retention: 336h0m0s
compacted_block_retention: 1h0m0s
retention_concurrency: 10
iterator_buffer_size: 1000
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
override_ring_key: compactor
ingester:
lifecycler:
Expand Down
1 change: 1 addition & 0 deletions modules/compactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
FlushSizeBytes: tempodb.DefaultFlushSizeBytes,
CompactedBlockRetention: time.Hour,
RetentionConcurrency: tempodb.DefaultRetentionConcurrency,
IteratorBufferSize: tempodb.DefaultIteratorBufferSize,
}

flagext.DefaultValues(&cfg.ShardingRing)
Expand Down
93 changes: 42 additions & 51 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tempodb

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand Down Expand Up @@ -54,6 +54,8 @@ const (
compactionCycle = 30 * time.Second

DefaultFlushSizeBytes uint32 = 30 * 1024 * 1024 // 30 MiB

DefaultIteratorBufferSize = 1000
)

// todo: pass a context/chan in to cancel this cleanly
Expand Down Expand Up @@ -116,8 +118,6 @@ func (rw *readerWriter) doCompaction() {
}
}

// todo : this method is brittle and has weird failure conditions. if it fails after it has written a new block then it will not clean up the old
// in these cases it's possible that the compact method actually will start making more blocks.
func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string) error {
level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas))

Expand All @@ -133,13 +133,13 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
nextCompactionLevel := compactionLevel + 1

var err error
bookmarks := make([]*bookmark, 0, len(blockMetas))
iters := make([]encoding.Iterator, 0, len(blockMetas))

// cleanup compaction
defer func() {
level.Info(rw.logger).Log("msg", "compaction complete")
for _, bm := range bookmarks {
bm.iter.Close()
for _, iter := range iters {
iter.Close()
}
}()

Expand All @@ -150,59 +150,48 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
totalRecords += blockMeta.TotalObjects
dataEncoding = blockMeta.DataEncoding // blocks chosen for compaction always have the same data encoding

block, err := encoding.NewBackendBlock(blockMeta, rw.r)
// Make sure block still exists
_, err = rw.r.BlockMeta(ctx, blockMeta.BlockID, tenantID)
if err != nil {
return err
}

iter, err := block.Iterator(rw.compactorCfg.ChunkSizeBytes)
// Open iterator
block, err := encoding.NewBackendBlock(blockMeta, rw.r)
if err != nil {
return err
}

bookmarks = append(bookmarks, newBookmark(iter))

_, err = rw.r.BlockMeta(ctx, blockMeta.BlockID, tenantID)
iter, err := block.Iterator(rw.compactorCfg.ChunkSizeBytes)
if err != nil {
return err
}

iters = append(iters, iter)
}

recordsPerBlock := (totalRecords / outputBlocks)
var newCompactedBlocks []*backend.BlockMeta
var currentBlock *encoding.StreamingBlock
var tracker backend.AppendTracker

for !allDone(ctx, bookmarks) {
var lowestID []byte
var lowestObject []byte
var lowestBookmark *bookmark

// find lowest ID of the new object
for _, b := range bookmarks {
currentID, currentObject, err := b.current(ctx)
if err == io.EOF {
continue
} else if err != nil {
return err
}
combiner := instrumentedObjectCombiner{
inner: rw.compactorSharder,
compactionLevelLabel: compactionLevelLabel,
}

if bytes.Equal(currentID, lowestID) {
var wasCombined bool
lowestObject, wasCombined = rw.compactorSharder.Combine(currentObject, lowestObject, dataEncoding)
if wasCombined {
metricCompactionObjectsCombined.WithLabelValues(compactionLevelLabel).Inc()
}
b.clear()
} else if len(lowestID) == 0 || bytes.Compare(currentID, lowestID) == -1 {
lowestID = currentID
lowestObject = currentObject
lowestBookmark = b
}
iter := encoding.NewMultiblockIterator(ctx, iters, rw.compactorCfg.IteratorBufferSize, combiner, dataEncoding)
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
defer iter.Close()

for {

id, body, err := iter.Next(ctx)
if err == io.EOF {
break
}

if len(lowestID) == 0 || len(lowestObject) == 0 || lowestBookmark == nil {
return fmt.Errorf("failed to find a lowest object in compaction")
if err != nil {
return errors.Wrap(err, "error iterating input blocks")
}

// make a new block if necessary
Expand All @@ -215,13 +204,10 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
newCompactedBlocks = append(newCompactedBlocks, currentBlock.BlockMeta())
}

// writing to the current block will cause the id to escape the iterator so we need to make a copy of it
writeID := append([]byte(nil), lowestID...)
err = currentBlock.AddObject(writeID, lowestObject)
err = currentBlock.AddObject(id, body)
if err != nil {
return err
}
lowestBookmark.clear()

// write partial block
if currentBlock.CurrentBufferLength() >= int(rw.compactorCfg.FlushSizeBytes) {
Expand Down Expand Up @@ -284,15 +270,6 @@ func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encodin
return nil
}

func allDone(ctx context.Context, bookmarks []*bookmark) bool {
for _, b := range bookmarks {
if !b.done(ctx) {
return false
}
}
return true
}

func compactionLevelForBlocks(blockMetas []*backend.BlockMeta) uint8 {
level := uint8(0)

Expand Down Expand Up @@ -326,3 +303,17 @@ func markCompacted(rw *readerWriter, tenantID string, oldBlocks []*backend.Block
// Update blocklist in memory
rw.updateBlocklist(tenantID, newBlocks, oldBlocks, newCompactions)
}

type instrumentedObjectCombiner struct {
compactionLevelLabel string
inner common.ObjectCombiner
}

// Combine wraps the inner combiner with combined metrics
func (i instrumentedObjectCombiner) Combine(objA []byte, objB []byte, dataEncoding string) ([]byte, bool) {
b, wasCombined := i.inner.Combine(objA, objB, dataEncoding)
if wasCombined {
metricCompactionObjectsCombined.WithLabelValues(i.compactionLevelLabel).Inc()
}
return b, wasCombined
}
45 changes: 0 additions & 45 deletions tempodb/compactor_bookmark.go

This file was deleted.

92 changes: 0 additions & 92 deletions tempodb/compactor_bookmark_test.go

This file was deleted.

Loading