diff --git a/CHANGELOG.md b/CHANGELOG.md index 2384f4dc52c..6f2972c05c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,5 +5,6 @@ * [ENHANCEMENT] CI checks for vendored dependencies using `make vendor-check`. Update CONTRIBUTING.md to reflect the same before checking in files in a PR. [#274](https://github.com/grafana/tempo/pull/274) * [ENHANCEMENT] Add warnings for suspect configs. [#294](https://github.com/grafana/tempo/pull/294) * [ENHANCEMENT] Add command line flags for s3 credentials. [#308](https://github.com/grafana/tempo/pull/308) +* [BUGFIX] S3 multi-part upload errors [#306](https://github.com/grafana/tempo/pull/325) * [BUGFIX] Increase Prometheus `notfound` metric on tempo-vulture. [#301](https://github.com/grafana/tempo/pull/301) * [BUGFIX] Return 404 if searching for a tenant id that does not exist in the backend. [#321](https://github.com/grafana/tempo/pull/321) diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 66eff67925a..fe0de180cba 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -94,6 +94,11 @@ func (c *Config) CheckConfig() { level.Warn(util.Logger).Log("msg", "compactor.compaction.compacted_block_timeout < storage.trace.blocklist_poll", "explan", "Queriers and Compactors may attempt to read a block that no longer exists") } + + if c.StorageConfig.Trace.Backend == "s3" && c.Compactor.Compactor.FlushSizeBytes < 5242880 { + level.Warn(util.Logger).Log("msg", "c.Compactor.Compactor.FlushSizeBytes < 5242880", + "explan", "Compaction flush size should be 5MB or higher for S3 backend") + } } // App is the root datastructure. diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 0a93acfa1d2..3379d230642 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -41,12 +41,17 @@ ingester: ``` ### [Compactor](https://github.com/grafana/tempo/blob/master/modules/compactor/config.go) -Compactors stream blocks from the storage backend, combine them and write them back. +Compactors stream blocks from the storage backend, combine them and write them back. Values shown below are the defaults. ``` compactor: compaction: - block_retention: 336h # duration to keep blocks + block_retention: 336h # duration to keep blocks + compacted_block_retention: 1h # duration to keep blocks that have been compacted elsewhere + compaction_window: 1h # blocks in this time window will be compacted together + chunk_size_bytes: 10485760 # amount of data to buffer from input blocks + flush_size_bytes: 31457280 # flush data to backend when buffer is this large + max_compaction_objects: 1000000 # maximum traces in a compacted block ring: kvstore: store: memberlist # in a high volume environment multiple compactors need to work together to keep up with incoming blocks. diff --git a/example/docker-compose/etc/tempo-s3-minio.yaml b/example/docker-compose/etc/tempo-s3-minio.yaml index ab19042c19f..685fa1e21d0 100644 --- a/example/docker-compose/etc/tempo-s3-minio.yaml +++ b/example/docker-compose/etc/tempo-s3-minio.yaml @@ -29,6 +29,7 @@ compactor: max_compaction_objects: 1000000 # maximum size of compacted blocks block_retention: 1h compacted_block_retention: 10m + flush_size_bytes: 5242880 storage: trace: diff --git a/modules/compactor/config.go b/modules/compactor/config.go index b74c70197db..63a845f5bfe 100644 --- a/modules/compactor/config.go +++ b/modules/compactor/config.go @@ -17,10 +17,11 @@ type Config struct { OverrideRingKey string `yaml:"override_ring_key"` } -// RegisterFlags registers the flags. +// RegisterFlagsAndApplyDefaults registers the flags. func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { cfg.Compactor = tempodb.CompactorConfig{ - ChunkSizeBytes: 10485760, // 10 MiB + ChunkSizeBytes: 10 * 1024 * 1024, // 10 MiB + FlushSizeBytes: 30 * 1024 * 1024, // 30 MiB CompactedBlockRetention: time.Hour, } diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 9d112620a67..415883a55f0 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -38,7 +38,6 @@ const ( inputBlocks = 2 outputBlocks = 1 - recordsPerBatch = 1000 compactionCycle = 30 * time.Second ) @@ -182,7 +181,7 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin lowestBookmark.clear() // write partial block - if currentBlock.Length()%recordsPerBatch == 0 { + if currentBlock.CurrentBufferLength() >= int(rw.compactorCfg.FlushSizeBytes) { tracker, err = appendBlock(rw, tracker, currentBlock) if err != nil { return errors.Wrap(err, "error writing partial block") diff --git a/tempodb/config.go b/tempodb/config.go index ca425104bae..d561de45016 100644 --- a/tempodb/config.go +++ b/tempodb/config.go @@ -28,6 +28,7 @@ type Config struct { type CompactorConfig struct { ChunkSizeBytes uint32 `yaml:"chunk_size_bytes"` // todo: do we need this? + FlushSizeBytes uint32 `yaml:"flush_size_bytes"` MaxCompactionRange time.Duration `yaml:"compaction_window"` MaxCompactionObjects int `yaml:"max_compaction_objects"` BlockRetention time.Duration `yaml:"block_retention"` diff --git a/tempodb/wal/compactor_block.go b/tempodb/wal/compactor_block.go index 797fe297da1..bf0d9a02963 100644 --- a/tempodb/wal/compactor_block.go +++ b/tempodb/wal/compactor_block.go @@ -65,6 +65,10 @@ func (c *CompactorBlock) CurrentBuffer() []byte { return c.appendBuffer.Bytes() } +func (c *CompactorBlock) CurrentBufferLength() int { + return c.appendBuffer.Len() +} + func (c *CompactorBlock) ResetBuffer() { c.appendBuffer.Reset() }