Fixes Kafka Supervisor Lag Report#13380
Conversation
| return currentOffsets | ||
| .entrySet() | ||
| .stream() | ||
| .filter(e -> latestSequenceFromStream.get(e.getKey()) != null) |
There was a problem hiding this comment.
The old method reports the lag for every partition present in latestSequenceFromStream.
But this new one also seems to be doing the same thing because you are filtering the entries using the latestSequenceFromStream.
I am perhaps missing something but not sure how the two methods are different.
Please also add a test to verify the difference between the two results.
There was a problem hiding this comment.
The previous method includes lags for all the latest partitions of the stream whereas this method only includes the required partitions (as passed by currentOffsets) lags.
There was a problem hiding this comment.
Also will add a unit test for the reports.
|
@kfaraz Updated the PR with a unit test. PTAL. |
kfaraz
left a comment
There was a problem hiding this comment.
Thanks for adding the tests, @tejaswini-imply !
Fixes inclusion of all stream partitions in all tasks.
Description
The PR (Adds Idle feature to
SeekableStreamSupervisorfor inactive stream) - #13144 updates the resulting lag calculation map inKafkaSupervisorto include all the latest partitions from the stream to set the idle state accordingly rather than the previous way of lag calculation only for the partitions actively being read from the stream. This led to an explosion of metrics in lag reports in cases where 1000s of tasks per supervisor are present.This PR creates a new method to generate lags for only those partitions a single task is actively reading from while updating the Supervisor reports.
Key changed/added classes in this PR
KafkaSupervisorThis PR has: