Skip to content

Commit

Permalink
#306 - Compactor flush to backend based on buffer size (#325)
Browse files Browse the repository at this point in the history
* Compactor flushes to backend based on buffer size instead of trace count. Expose configuration option. Default to 30MB

* Add compactor settings to docs

* Add entry to changelog
  • Loading branch information
mdisibio committed Nov 5, 2020
1 parent 522a6fd commit 09af806
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -5,5 +5,6 @@
* [ENHANCEMENT] CI checks for vendored dependencies using `make vendor-check`. Update CONTRIBUTING.md to reflect the same before checking in files in a PR. [#274](https://github.com/grafana/tempo/pull/274)
* [ENHANCEMENT] Add warnings for suspect configs. [#294](https://github.com/grafana/tempo/pull/294)
* [ENHANCEMENT] Add command line flags for s3 credentials. [#308](https://github.com/grafana/tempo/pull/308)
* [BUGFIX] S3 multi-part upload errors [#306](https://github.com/grafana/tempo/pull/325)
* [BUGFIX] Increase Prometheus `notfound` metric on tempo-vulture. [#301](https://github.com/grafana/tempo/pull/301)
* [BUGFIX] Return 404 if searching for a tenant id that does not exist in the backend. [#321](https://github.com/grafana/tempo/pull/321)
5 changes: 5 additions & 0 deletions cmd/tempo/app/app.go
Expand Up @@ -94,6 +94,11 @@ func (c *Config) CheckConfig() {
level.Warn(util.Logger).Log("msg", "compactor.compaction.compacted_block_timeout < storage.trace.blocklist_poll",
"explan", "Queriers and Compactors may attempt to read a block that no longer exists")
}

if c.StorageConfig.Trace.Backend == "s3" && c.Compactor.Compactor.FlushSizeBytes < 5242880 {
level.Warn(util.Logger).Log("msg", "c.Compactor.Compactor.FlushSizeBytes < 5242880",
"explan", "Compaction flush size should be 5MB or higher for S3 backend")
}
}

// App is the root datastructure.
Expand Down
9 changes: 7 additions & 2 deletions docs/tempo/website/configuration/_index.md
Expand Up @@ -41,12 +41,17 @@ ingester:
```

### [Compactor](https://github.com/grafana/tempo/blob/master/modules/compactor/config.go)
Compactors stream blocks from the storage backend, combine them and write them back.
Compactors stream blocks from the storage backend, combine them and write them back. Values shown below are the defaults.

```
compactor:
compaction:
block_retention: 336h # duration to keep blocks
block_retention: 336h # duration to keep blocks
compacted_block_retention: 1h # duration to keep blocks that have been compacted elsewhere
compaction_window: 1h # blocks in this time window will be compacted together
chunk_size_bytes: 10485760 # amount of data to buffer from input blocks
flush_size_bytes: 31457280 # flush data to backend when buffer is this large
max_compaction_objects: 1000000 # maximum traces in a compacted block
ring:
kvstore:
store: memberlist # in a high volume environment multiple compactors need to work together to keep up with incoming blocks.
Expand Down
1 change: 1 addition & 0 deletions example/docker-compose/etc/tempo-s3-minio.yaml
Expand Up @@ -29,6 +29,7 @@ compactor:
max_compaction_objects: 1000000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m
flush_size_bytes: 5242880

storage:
trace:
Expand Down
5 changes: 3 additions & 2 deletions modules/compactor/config.go
Expand Up @@ -17,10 +17,11 @@ type Config struct {
OverrideRingKey string `yaml:"override_ring_key"`
}

// RegisterFlags registers the flags.
// RegisterFlagsAndApplyDefaults registers the flags.
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.Compactor = tempodb.CompactorConfig{
ChunkSizeBytes: 10485760, // 10 MiB
ChunkSizeBytes: 10 * 1024 * 1024, // 10 MiB
FlushSizeBytes: 30 * 1024 * 1024, // 30 MiB
CompactedBlockRetention: time.Hour,
}

Expand Down
3 changes: 1 addition & 2 deletions tempodb/compactor.go
Expand Up @@ -38,7 +38,6 @@ const (
inputBlocks = 2
outputBlocks = 1

recordsPerBatch = 1000
compactionCycle = 30 * time.Second
)

Expand Down Expand Up @@ -182,7 +181,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin
lowestBookmark.clear()

// write partial block
if currentBlock.Length()%recordsPerBatch == 0 {
if currentBlock.CurrentBufferLength() >= int(rw.compactorCfg.FlushSizeBytes) {
tracker, err = appendBlock(rw, tracker, currentBlock)
if err != nil {
return errors.Wrap(err, "error writing partial block")
Expand Down
1 change: 1 addition & 0 deletions tempodb/config.go
Expand Up @@ -28,6 +28,7 @@ type Config struct {

type CompactorConfig struct {
ChunkSizeBytes uint32 `yaml:"chunk_size_bytes"` // todo: do we need this?
FlushSizeBytes uint32 `yaml:"flush_size_bytes"`
MaxCompactionRange time.Duration `yaml:"compaction_window"`
MaxCompactionObjects int `yaml:"max_compaction_objects"`
BlockRetention time.Duration `yaml:"block_retention"`
Expand Down
4 changes: 4 additions & 0 deletions tempodb/wal/compactor_block.go
Expand Up @@ -65,6 +65,10 @@ func (c *CompactorBlock) CurrentBuffer() []byte {
return c.appendBuffer.Bytes()
}

func (c *CompactorBlock) CurrentBufferLength() int {
return c.appendBuffer.Len()
}

func (c *CompactorBlock) ResetBuffer() {
c.appendBuffer.Reset()
}
Expand Down

0 comments on commit 09af806

Please sign in to comment.