Skip to content

Commit

Permalink
ingest storage: record latency in replaying records (#8176)
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed May 24, 2024
1 parent 487a5c9 commit 8727c7f
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (r *PartitionReader) run(ctx context.Context) error {
}

func (r *PartitionReader) processNextFetches(ctx context.Context, delayObserver prometheus.Observer) {
fetches := r.client.PollFetches(ctx)
fetches := r.pollFetches(ctx)
r.recordFetchesMetrics(fetches, delayObserver)
r.logFetchErrors(fetches)
fetches = filterOutErrFetches(fetches)
Expand Down Expand Up @@ -336,7 +336,9 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche
})

for boff.Ongoing() {
consumeStart := time.Now()
err := r.consumer.consume(ctx, records)
r.metrics.consumeLatency.Observe(time.Since(consumeStart).Seconds())
if err == nil {
break
}
Expand Down Expand Up @@ -571,6 +573,13 @@ func (r *PartitionReader) WaitReadConsistency(ctx context.Context) (returnErr er
return r.consumedOffsetWatcher.Wait(ctx, lastProducedOffset)
}

func (r *PartitionReader) pollFetches(ctx context.Context) kgo.Fetches {
defer func(start time.Time) {
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())
}(time.Now())
return r.client.PollFetches(ctx)
}

type partitionCommitter struct {
services.Service

Expand Down Expand Up @@ -707,10 +716,12 @@ type readerMetrics struct {
recordsPerFetch prometheus.Histogram
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
fetchWaitDuration prometheus.Histogram
strongConsistencyRequests prometheus.Counter
strongConsistencyFailures prometheus.Counter
strongConsistencyLatency prometheus.Histogram
lastConsumedOffset prometheus.Gauge
consumeLatency prometheus.Histogram
kprom *kprom.Metrics
}

Expand Down Expand Up @@ -750,6 +761,16 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric
Name: "cortex_ingest_storage_reader_fetches_total",
Help: "Total number of Kafka fetches received by the consumer.",
}),
fetchWaitDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_fetch_wait_duration_seconds",
Help: "How long fetching a batch of records from the kafka client took to complete.",
NativeHistogramBucketFactor: 1.1,
}),
consumeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_consume_duration_seconds",
Help: "How long a request spent consuming a record batch from Kafka.",
NativeHistogramBucketFactor: 1.1,
}),
strongConsistencyRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_strong_consistency_requests_total",
Help: "Total number of requests for which strong consistency has been requested.",
Expand Down

0 comments on commit 8727c7f

Please sign in to comment.