Skip to content

Commit

Permalink
Fix cortex_ingest_storage_reader_last_consumed_offset metrics initial…
Browse files Browse the repository at this point in the history
…ization (grafana#7733)

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Mar 26, 2024
1 parent 4de8352 commit 092aee1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
15 changes: 10 additions & 5 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,15 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18), // Buckets between 125ms and 9h.
}, []string{"phase"})

lastConsumedOffset := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingest_storage_reader_last_consumed_offset",
Help: "The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))},
})

// Initialise the last consumed offset metric to -1 to signal no offset has been consumed yet (0 is a valid offset).
lastConsumedOffset.Set(-1)

return readerMetrics{
receiveDelayWhenStarting: receiveDelay.WithLabelValues("starting"),
receiveDelayWhenRunning: receiveDelay.WithLabelValues("running"),
Expand Down Expand Up @@ -691,11 +700,7 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.DefBuckets,
}),
lastConsumedOffset: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_ingest_storage_reader_last_consumed_offset",
Help: "The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.",
ConstLabels: prometheus.Labels{"partition": strconv.Itoa(int(partitionID))},
}),
lastConsumedOffset: lastConsumedOffset,
kprom: kprom.NewMetrics("cortex_ingest_storage_reader",
kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"partition": strconv.Itoa(int(partitionID))}, reg)),
// Do not export the client ID, because we use it to specify options to the backend.
Expand Down
32 changes: 29 additions & 3 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,20 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {
var (
_, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName)
consumer = consumerFunc(func(ctx context.Context, records []record) error { return nil })
reg = prometheus.NewPedanticRegistry()
)

// Create and start the reader. We expect the reader to start even if partition is empty.
reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second))
reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(time.Second), withRegistry(reg))
require.NoError(t, services.StartAndAwaitRunning(ctx, reader))
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))

// The last consumed offset should be -1, since nothing has been consumed yet.
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.
# TYPE cortex_ingest_storage_reader_last_consumed_offset gauge
cortex_ingest_storage_reader_last_consumed_offset{partition="1"} -1
`), "cortex_ingest_storage_reader_last_consumed_offset"))
})

t.Run("should immediately switch to Running state if configured max lag is 0", func(t *testing.T) {
Expand All @@ -290,6 +298,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {
var (
cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName)
consumer = consumerFunc(func(ctx context.Context, records []record) error { return nil })
reg = prometheus.NewPedanticRegistry()
)

// Mock Kafka to fail the Fetch request.
Expand All @@ -306,9 +315,16 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {
t.Log("produced 2 records")

// Create and start the reader. We expect the reader to start even if Fetch is failing.
reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(0))
reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withMaxConsumerLagAtStartup(0), withRegistry(reg))
require.NoError(t, services.StartAndAwaitRunning(ctx, reader))
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))

// The last consumed offset should be -1, since nothing has been consumed yet (Fetch requests are failing).
assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.
# TYPE cortex_ingest_storage_reader_last_consumed_offset gauge
cortex_ingest_storage_reader_last_consumed_offset{partition="1"} -1
`), "cortex_ingest_storage_reader_last_consumed_offset"))
})

t.Run("should consume partition from start if last committed offset is missing and wait until max lag is honored", func(t *testing.T) {
Expand Down Expand Up @@ -456,6 +472,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {

var (
cluster, clusterAddr = testkafka.CreateCluster(t, partitionID+1, topicName)
reg = prometheus.NewPedanticRegistry()
fetchRequestsCount = atomic.NewInt64(0)
fetchShouldFail = atomic.NewBool(true)
consumedRecordsMx sync.Mutex
Expand Down Expand Up @@ -490,7 +507,7 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {
t.Log("produced 2 records before starting the reader")

// Create and start the reader.
reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromEnd), withMaxConsumerLagAtStartup(time.Second))
reader := createReader(t, clusterAddr, topicName, partitionID, consumer, withConsumeFromPositionAtStartup(consumeFromEnd), withMaxConsumerLagAtStartup(time.Second), withRegistry(reg))
require.NoError(t, reader.StartAsync(ctx))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))
Expand Down Expand Up @@ -521,6 +538,15 @@ func TestPartitionReader_ConsumeAtStartup(t *testing.T) {
defer consumedRecordsMx.Unlock()
return slices.Clone(consumedRecords)
})

// We expect the last consumed offset to be tracked in a metric.
test.Poll(t, time.Second, nil, func() interface{} {
return promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_last_consumed_offset The last offset successfully consumed by the partition reader. Set to -1 if not offset has been consumed yet.
# TYPE cortex_ingest_storage_reader_last_consumed_offset gauge
cortex_ingest_storage_reader_last_consumed_offset{partition="1"} 2
`), "cortex_ingest_storage_reader_last_consumed_offset")
})
})

t.Run("should consume partition from start if position=start, and wait until max lag is honored", func(t *testing.T) {
Expand Down

0 comments on commit 092aee1

Please sign in to comment.