Skip to content

[kafka_consumer] Use AdminClient.list_offsets for earliest fetch and isolate its failures#23580

Open
piochelepiotr wants to merge 3 commits intomasterfrom
piotr.wolski/kafka-consumer-list-offsets-resilient
Open

[kafka_consumer] Use AdminClient.list_offsets for earliest fetch and isolate its failures#23580
piochelepiotr wants to merge 3 commits intomasterfrom
piotr.wolski/kafka-consumer-list-offsets-resilient

Conversation

@piochelepiotr
Copy link
Copy Markdown
Contributor

@piochelepiotr piochelepiotr commented May 4, 2026

What does this PR do?

Two changes to _collect_topic_metadata in the kafka_consumer cluster monitoring path:

  1. Switch the earliest-offset fetch to AdminClient.list_offsets(OffsetSpec.earliest()). The previous path went through Consumer.offsets_for_times(timestamp=0), which the broker services by walking .timeindex segment files. The new path uses the EARLIEST_TIMESTAMP sentinel (-2), which the broker returns directly from in-memory logStartOffset.

  2. Isolate the call's failures. A new _fetch_earliest_offsets helper catches request-level and per-partition failures and returns {} (or a partial map) instead of propagating. The per-partition loop now treats a missing earliest as "skip just the earliest-dependent metrics for this partition" rather than aborting the whole topic-metadata collection.

Motivation

Customer flare showed:

2026-04-29 14:46:19 cluster_metadata.py | Found 18945 topics
2026-04-29 14:47:52 cluster_metadata.py | Error collecting topic metadata:
  KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Failed to get offsets: Broker: Request timed out"}

The offsets_for_times(ts=0) call timed out (large cluster, many segments per partition, many broker fan-out). Because the call sits at the top of _collect_topic_metadata, the entire function aborted, dropping topic.message_rate, topic.partitions, partition.isr/replicas/under_replicated/offline, topic.config.*, and topic.size — none of which structurally depend on earliest offsets.

Verification

Reproduced on a local 20k-topic / 40k-partition / 20k-consumer-group cluster with the timeout simulated:

Scenario Total samples topic.message_rate topic.size partition.size
Healthy (post-fix) 480,217 20,001 20,001 40,001
Earliest fetch fails (post-fix) 380,214 20,001 0 (skipped) 0 (skipped)
Earliest fetch fails (pre-fix) 180k 0 0 0

After the fix, only the 3 metrics that genuinely require earliest offsets are skipped on failure; everything else keeps emitting.

Review checklist (to be filled by the reviewer)

  • Feature or bugfix MUST have tests
  • Changelog entries are necessary in most cases (use ddev release changelog new)
  • Make sure the PR title is descriptive and follows conventions

🤖 Generated with Claude Code

…isolate its failures

Replace the LOW_WATERMARK fetch in `_collect_topic_metadata` (which went
through `Consumer.offsets_for_times(timestamp=0)` and forced the broker
to walk `.timeindex` segment files) with `AdminClient.list_offsets` and
`OffsetSpec.earliest()`. The broker services this from in-memory
`logStartOffset` instead, eliminating the time-index scan that was
timing out for clusters with many segments x multi-broker fan-out.

Wrap the call so a failure no longer aborts the entire topic-metadata
collection. When the earliest fetch fails (or returns errors per
partition), only the metrics that genuinely depend on it are skipped:
`partition.beginning_offset`, `partition.size`, and `topic.size`.
Everything else - `topic.message_rate`, `topic.partitions`,
`partition.isr/replicas/under_replicated/offline`, `topic.config.*`,
and all consumer-side metrics - keeps emitting.

Verified locally on a 20k-topic / 40k-partition / 20k-consumer-group
cluster: with the failure simulated, sample volume drops from 480k to
380k (only the 100k earliest-dependent samples lost) instead of the
prior 300k+ loss when the whole `_collect_topic_metadata` aborted.
@dd-octo-sts
Copy link
Copy Markdown
Contributor

dd-octo-sts Bot commented May 4, 2026

Validation Report

All 20 validations passed.

Show details
Validation Description Status
agent-reqs Verify check versions match the Agent requirements file
ci Validate CI configuration and Codecov settings
codeowners Validate every integration has a CODEOWNERS entry
config Validate default configuration files against spec.yaml
dep Verify dependency pins are consistent and Agent-compatible
http Validate integrations use the HTTP wrapper correctly
imports Validate check imports do not use deprecated modules
integration-style Validate check code style conventions
jmx-metrics Validate JMX metrics definition files and config
labeler Validate PR labeler config matches integration directories
legacy-signature Validate no integration uses the legacy Agent check signature
license-headers Validate Python files have proper license headers
licenses Validate third-party license attribution list
metadata Validate metadata.csv metric definitions
models Validate configuration data models match spec.yaml
openmetrics Validate OpenMetrics integrations disable the metric limit
package Validate Python package metadata and naming
readmes Validate README files have required sections
saved-views Validate saved view JSON file structure and fields
version Validate version consistency between package and changelog

View full run

@codecov
Copy link
Copy Markdown

codecov Bot commented May 4, 2026

Codecov Report

❌ Patch coverage is 69.09091% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.01%. Comparing base (864138f) to head (a90570b).
⚠️ Report is 1 commits behind head on master.

Additional details and impacted files
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@datadog-datadog-prod-us1-2
Copy link
Copy Markdown

Tests

🎉 All green!

❄️ No new flaky tests detected
🧪 All tests passed

🎯 Code Coverage (details)
Patch Coverage: 69.09%
Overall Coverage: 87.57% (+0.40%)

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: a90570b | Docs | Datadog PR Page | Give us feedback!

Copy link
Copy Markdown
Contributor

@chnn chnn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants