Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ instructions below to upgrade your Postgres.
* `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup`
* [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983
* [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
* [ENHANCEMENT] Experimental TSDB: Improved multi-tenant bucket store. #1991
* Allowed to configure the blocks sync interval via `-experimental.tsdb.bucket-store.sync-interval` (0 disables the sync)
* Limited the number of tenants concurrently synched by `-experimental.tsdb.bucket-store.block-sync-concurrency`
Expand Down
9 changes: 8 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Compactor struct {
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionRunsFailed prometheus.Counter

// TSDB syncer metrics
syncerMetrics *syncerMetrics
}

// NewCompactor makes a new Compactor.
Expand Down Expand Up @@ -132,6 +135,7 @@ func newCompactor(
// Register metrics.
if registerer != nil {
registerer.MustRegister(c.compactionRunsStarted, c.compactionRunsCompleted, c.compactionRunsFailed)
c.syncerMetrics = newSyncerMetrics(registerer)
}

// Start the compactor loop.
Expand Down Expand Up @@ -220,9 +224,12 @@ func (c *Compactor) compactUsers(ctx context.Context) bool {
func (c *Compactor) compactUser(ctx context.Context, userID string) error {
bucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient)

reg := prometheus.NewRegistry()
defer c.syncerMetrics.gatherThanosSyncerMetrics(reg)

syncer, err := compact.NewSyncer(
c.logger,
nil, // TODO(pracucci) we should pass the prometheus registerer, but we would need to inject the user label to each metric, otherwise we have clashing metrics
reg,
bucket,
c.compactorCfg.ConsistencyDelay,
c.compactorCfg.BlockSyncConcurrency,
Expand Down
109 changes: 108 additions & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,58 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
# TYPE cortex_compactor_runs_failed_total counter
# HELP cortex_compactor_runs_failed_total Total number of compaction runs failed.
cortex_compactor_runs_failed_total 0

# HELP cortex_compactor_garbage_collected_blocks_total TSDB Syncer: Total number of deleted blocks by compactor.
# TYPE cortex_compactor_garbage_collected_blocks_total counter
cortex_compactor_garbage_collected_blocks_total 0

# HELP cortex_compactor_garbage_collection_duration_seconds TSDB Syncer: Time it took to perform garbage collection iteration.
# TYPE cortex_compactor_garbage_collection_duration_seconds histogram
cortex_compactor_garbage_collection_duration_seconds_bucket{le="+Inf"} 0
cortex_compactor_garbage_collection_duration_seconds_sum 0
cortex_compactor_garbage_collection_duration_seconds_count 0

# HELP cortex_compactor_garbage_collection_failures_total TSDB Syncer: Total number of failed garbage collection operations.
# TYPE cortex_compactor_garbage_collection_failures_total counter
cortex_compactor_garbage_collection_failures_total 0

# HELP cortex_compactor_garbage_collection_total TSDB Syncer: Total number of garbage collection operations.
# TYPE cortex_compactor_garbage_collection_total counter
cortex_compactor_garbage_collection_total 0

# HELP cortex_compactor_sync_meta_duration_seconds TSDB Syncer: Time it took to sync meta files.
# TYPE cortex_compactor_sync_meta_duration_seconds histogram
cortex_compactor_sync_meta_duration_seconds_bucket{le="+Inf"} 0
cortex_compactor_sync_meta_duration_seconds_sum 0
cortex_compactor_sync_meta_duration_seconds_count 0

# HELP cortex_compactor_sync_meta_failures_total TSDB Syncer: Total number of failed sync meta operations.
# TYPE cortex_compactor_sync_meta_failures_total counter
cortex_compactor_sync_meta_failures_total 0

# HELP cortex_compactor_sync_meta_total TSDB Syncer: Total number of sync meta operations.
# TYPE cortex_compactor_sync_meta_total counter
cortex_compactor_sync_meta_total 0

# HELP cortex_compactor_group_compaction_runs_completed_total TSDB Syncer: Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.
# TYPE cortex_compactor_group_compaction_runs_completed_total counter
cortex_compactor_group_compaction_runs_completed_total 0

# HELP cortex_compactor_group_compaction_runs_started_total TSDB Syncer: Total number of group compaction attempts.
# TYPE cortex_compactor_group_compaction_runs_started_total counter
cortex_compactor_group_compaction_runs_started_total 0

# HELP cortex_compactor_group_compactions_failures_total TSDB Syncer: Total number of failed group compactions.
# TYPE cortex_compactor_group_compactions_failures_total counter
cortex_compactor_group_compactions_failures_total 0

# HELP cortex_compactor_group_compactions_total TSDB Syncer: Total number of group compaction attempts that resulted in a new block.
# TYPE cortex_compactor_group_compactions_total counter
cortex_compactor_group_compactions_total 0

# HELP cortex_compactor_group_vertical_compactions_total TSDB Syncer: Total number of group compaction attempts that resulted in a new block based on overlapping blocks.
# TYPE cortex_compactor_group_vertical_compactions_total counter
cortex_compactor_group_vertical_compactions_total 0
`)))
}

Expand Down Expand Up @@ -140,6 +192,58 @@ func TestCompactor_ShouldRetryOnFailureWhileDiscoveringUsersFromBucket(t *testin
# TYPE cortex_compactor_runs_failed_total counter
# HELP cortex_compactor_runs_failed_total Total number of compaction runs failed.
cortex_compactor_runs_failed_total 1

# HELP cortex_compactor_garbage_collected_blocks_total TSDB Syncer: Total number of deleted blocks by compactor.
# TYPE cortex_compactor_garbage_collected_blocks_total counter
cortex_compactor_garbage_collected_blocks_total 0

# HELP cortex_compactor_garbage_collection_duration_seconds TSDB Syncer: Time it took to perform garbage collection iteration.
# TYPE cortex_compactor_garbage_collection_duration_seconds histogram
cortex_compactor_garbage_collection_duration_seconds_bucket{le="+Inf"} 0
cortex_compactor_garbage_collection_duration_seconds_sum 0
cortex_compactor_garbage_collection_duration_seconds_count 0

# HELP cortex_compactor_garbage_collection_failures_total TSDB Syncer: Total number of failed garbage collection operations.
# TYPE cortex_compactor_garbage_collection_failures_total counter
cortex_compactor_garbage_collection_failures_total 0

# HELP cortex_compactor_garbage_collection_total TSDB Syncer: Total number of garbage collection operations.
# TYPE cortex_compactor_garbage_collection_total counter
cortex_compactor_garbage_collection_total 0

# HELP cortex_compactor_sync_meta_duration_seconds TSDB Syncer: Time it took to sync meta files.
# TYPE cortex_compactor_sync_meta_duration_seconds histogram
cortex_compactor_sync_meta_duration_seconds_bucket{le="+Inf"} 0
cortex_compactor_sync_meta_duration_seconds_sum 0
cortex_compactor_sync_meta_duration_seconds_count 0

# HELP cortex_compactor_sync_meta_failures_total TSDB Syncer: Total number of failed sync meta operations.
# TYPE cortex_compactor_sync_meta_failures_total counter
cortex_compactor_sync_meta_failures_total 0

# HELP cortex_compactor_sync_meta_total TSDB Syncer: Total number of sync meta operations.
# TYPE cortex_compactor_sync_meta_total counter
cortex_compactor_sync_meta_total 0

# HELP cortex_compactor_group_compaction_runs_completed_total TSDB Syncer: Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.
# TYPE cortex_compactor_group_compaction_runs_completed_total counter
cortex_compactor_group_compaction_runs_completed_total 0

# HELP cortex_compactor_group_compaction_runs_started_total TSDB Syncer: Total number of group compaction attempts.
# TYPE cortex_compactor_group_compaction_runs_started_total counter
cortex_compactor_group_compaction_runs_started_total 0

# HELP cortex_compactor_group_compactions_failures_total TSDB Syncer: Total number of failed group compactions.
# TYPE cortex_compactor_group_compactions_failures_total counter
cortex_compactor_group_compactions_failures_total 0

# HELP cortex_compactor_group_compactions_total TSDB Syncer: Total number of group compaction attempts that resulted in a new block.
# TYPE cortex_compactor_group_compactions_total counter
cortex_compactor_group_compactions_total 0

# HELP cortex_compactor_group_vertical_compactions_total TSDB Syncer: Total number of group compaction attempts that resulted in a new block based on overlapping blocks.
# TYPE cortex_compactor_group_vertical_compactions_total counter
cortex_compactor_group_vertical_compactions_total 0
`)))
}

Expand Down Expand Up @@ -188,6 +292,9 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
`level=info msg="successfully compacted user blocks" user=user-2`,
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))

// Instead of testing for shipper metrics, we only check our metrics here.
// Real shipper metrics are too variable to embed into a test.
testedMetrics := []string{"cortex_compactor_runs_started_total", "cortex_compactor_runs_completed_total", "cortex_compactor_runs_failed_total"}
assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
# TYPE cortex_compactor_runs_started_total counter
# HELP cortex_compactor_runs_started_total Total number of compaction runs started.
Expand All @@ -200,7 +307,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
# TYPE cortex_compactor_runs_failed_total counter
# HELP cortex_compactor_runs_failed_total Total number of compaction runs failed.
cortex_compactor_runs_failed_total 0
`)))
`), testedMetrics...))
}

func prepare(t *testing.T) (*Compactor, *cortex_tsdb.BucketClientMock, *tsdbCompactorMock, *bytes.Buffer, prometheus.Gatherer) {
Expand Down
132 changes: 132 additions & 0 deletions pkg/compactor/syncer_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package compactor

import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

// Copied from Thanos, pkg/compact/compact.go.
// Here we aggregate metrics from all finished syncers.
type syncerMetrics struct {
syncMetas prometheus.Counter
syncMetaFailures prometheus.Counter
syncMetaDuration *util.HistogramDataCollector // was prometheus.Histogram before
garbageCollectedBlocks prometheus.Counter
garbageCollections prometheus.Counter
garbageCollectionFailures prometheus.Counter
garbageCollectionDuration *util.HistogramDataCollector // was prometheus.Histogram before
compactions prometheus.Counter
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionFailures prometheus.Counter
verticalCompactions prometheus.Counter
}

// Copied (and modified with Cortex prefix) from Thanos, pkg/compact/compact.go
// We also ignore "group" label, since we only use a single group.
func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
var m syncerMetrics

m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_sync_meta_total",
Help: "TSDB Syncer: Total number of sync meta operations.",
})
m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_sync_meta_failures_total",
Help: "TSDB Syncer: Total number of failed sync meta operations.",
})
m.syncMetaDuration = util.NewHistogramDataCollector(prometheus.NewDesc(
"cortex_compactor_sync_meta_duration_seconds",
"TSDB Syncer: Time it took to sync meta files.",
nil, nil))

m.garbageCollectedBlocks = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_garbage_collected_blocks_total",
Help: "TSDB Syncer: Total number of deleted blocks by compactor.",
})
m.garbageCollections = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_garbage_collection_total",
Help: "TSDB Syncer: Total number of garbage collection operations.",
})
m.garbageCollectionFailures = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_garbage_collection_failures_total",
Help: "TSDB Syncer: Total number of failed garbage collection operations.",
})
m.garbageCollectionDuration = util.NewHistogramDataCollector(prometheus.NewDesc(
"cortex_compactor_garbage_collection_duration_seconds",
"TSDB Syncer: Time it took to perform garbage collection iteration.",
nil, nil))

m.compactions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_group_compactions_total",
Help: "TSDB Syncer: Total number of group compaction attempts that resulted in a new block.",
})
m.compactionRunsStarted = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_group_compaction_runs_started_total",
Help: "TSDB Syncer: Total number of group compaction attempts.",
})
m.compactionRunsCompleted = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_group_compaction_runs_completed_total",
Help: "TSDB Syncer: Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.",
})
m.compactionFailures = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_group_compactions_failures_total",
Help: "TSDB Syncer: Total number of failed group compactions.",
})
m.verticalCompactions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_group_vertical_compactions_total",
Help: "TSDB Syncer: Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
})

if reg != nil {
reg.MustRegister(
m.syncMetas,
m.syncMetaFailures,
m.syncMetaDuration,
m.garbageCollectedBlocks,
m.garbageCollections,
m.garbageCollectionFailures,
m.garbageCollectionDuration,
m.compactions,
m.compactionRunsStarted,
m.compactionRunsCompleted,
m.compactionFailures,
m.verticalCompactions,
)
}
return &m
}

func (m *syncerMetrics) gatherThanosSyncerMetrics(reg *prometheus.Registry) {
if m == nil {
return
}

mf, err := reg.Gather()
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to gather metrics from syncer registry after compaction", "err", err)
return
}

mfm, err := util.NewMetricFamilyMap(mf)
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to gather metrics from syncer registry after compaction", "err", err)
return
}

m.syncMetas.Add(mfm.SumCounters("thanos_compact_sync_meta_total"))
m.syncMetaFailures.Add(mfm.SumCounters("thanos_compact_sync_meta_failures_total"))
m.syncMetaDuration.Add(mfm.SumHistograms("thanos_compact_sync_meta_duration_seconds"))
m.garbageCollectedBlocks.Add(mfm.SumCounters("thanos_compact_garbage_collected_blocks_total"))
m.garbageCollections.Add(mfm.SumCounters("thanos_compact_garbage_collection_total"))
m.garbageCollectionFailures.Add(mfm.SumCounters("thanos_compact_garbage_collection_failures_total"))
m.garbageCollectionDuration.Add(mfm.SumHistograms("thanos_compact_garbage_collection_duration_seconds"))

// These metrics have "group" label, but we sum them all together.
m.compactions.Add(mfm.SumCounters("thanos_compact_group_compactions_total"))
m.compactionRunsStarted.Add(mfm.SumCounters("thanos_compact_group_compaction_runs_started_total"))
m.compactionRunsCompleted.Add(mfm.SumCounters("thanos_compact_group_compaction_runs_completed_total"))
m.compactionFailures.Add(mfm.SumCounters("thanos_compact_group_compactions_failures_total"))
m.verticalCompactions.Add(mfm.SumCounters("thanos_compact_group_vertical_compactions_total"))
}
Loading