-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4429; records-lag should be zero if FetchResponse is empty #2155
Conversation
@@ -689,6 +689,9 @@ private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSear | |||
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed); | |||
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1); | |||
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); | |||
} else if (partition.records.iterator().hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right. At a minimum, we're missing the '!' to invert the check. Additionally, iteration through a record set is not cheap (it may involve decompression), so we should try to use the work we've already done. In fact, maybe we could remove the check and compute the lag based on the high watermark and the current position of the consumer instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Yes, good point. I think we can compute lag as partition.highWatermark - position
if parsed result is empty. But I am not sure if we should use this as lag if parsed result is not empty. It may be a good solution. But doing so changes the semantic of the fetch lag metric since the metric would only be updated when position is updated in the next poll()
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This is a little subtle, but it doesn't seem incorrect. Since we know that the fetched offset matches our current position, the fetch lag should be in sync with the position. In fact, it may be more intuitive if we use fetchOffset
instead of position
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Sorry for late reply. Sure, I will replace position
with fetchOffset
in the patch. I still keep the existing measurement of fetchLag in the case where the response is not empty because I don't want to cause any unexpected problem by making this subtle but backward incompatible change. On the other hand I don't mind if you or someone else want to change it.
@lindong28 |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@ewencp Sure, I just added a unit test for this patch. Can you take another look? Thank you. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
} | ||
|
||
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, short error, long hw, int throttleTime) { | ||
this.records = records; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I copied & pasted this logic without checking if it is needed. Fixed now.
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); | ||
for (int v = 0; v < 3; v++) | ||
builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); | ||
records = fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we use a local variable instead of overriding the field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, records
is already a variable, but it seems we could move its initialization to here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can. Personally I think the logic is clearer by passing it as parameter of fetchRecords() instead of via the class variable. Do you think the updated patch looks better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, what I meant is just to move the initialization of the records variable from above to here (since we don't actually need it outside of the loop):
List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(tp);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. Sure, it is fixed now.
|
||
// recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse | ||
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); | ||
fetchRecords(builder.build(), Errors.NONE.code(), 100L, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also use MemoryRecords.EMPTY
here (I think).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice point. It works:)
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
LGTM. Thanks for the patch! |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Author: Dong Lin <lindong28@gmail.com> Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Jason Gustafson <jason@confluent.io> Closes apache#2155 from lindong28/KAFKA-4429
No description provided.