Skip to content

Conversation

@benclive
Copy link
Contributor

What this PR does / why we need it:
Creates a metric containing E2E processing time: The time between a log line hitting the distributor and it being available for querying by the new engine.

  • I intend to use this as a signal for when the ingestion pipeline is lagging, no matter the reason.
  • It is implemented by tracking the record timestamp (the time when a kafka record was created) and passing it through the dataobj-consumer to the index-builder so it can report it after an index object has been created.
  • Rollout order doesn't matter: The index-builder will ignore the field if it doesn't exist.

@benclive benclive requested a review from a team as a code owner November 28, 2025 14:50
// Update offset metric at the end of processing
defer p.metrics.updateOffset(record.Offset)

if record.Timestamp.Before(p.earliestRecordTime) || p.earliestRecordTime.IsZero() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if record.Timestamp.Before(p.earliestRecordTime)

This condition should be impossible as records are ordered and timestamps assigned by the broker?

I'm happy to ignore it for now, but it would be great if we could validate this at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same. I'm fairly sure its impossible, but in case we ever switch to processing from multiple partitions I decided to keep this in


func getEarliestIndexedRecord(logger log.Logger, events []metastore.ObjectWrittenEvent) time.Time {
var earliestIndexedRecordTime time.Time
for _, ev := range events {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can len(events) ever be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unlikely but possible I think... Let me guard against setting the metric from a zero time

@benclive benclive merged commit 225a059 into main Nov 28, 2025
70 checks passed
@benclive benclive deleted the benclive/create-e2e-ingestion-metric branch November 28, 2025 15:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants