Conversation
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17748 +/- ##
============================================
- Coverage 40.59% 40.57% -0.03%
Complexity 2574 2574
============================================
Files 5179 5180 +1
Lines 349979 350366 +387
Branches 44749 44803 +54
============================================
+ Hits 142082 142153 +71
- Misses 207897 208213 +316 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
luoluoyuyu
left a comment
There was a problem hiding this comment.
Review summary
Uses CommitterKey (including restartTimes) when discarding events on pipe drop so a new pipe instance with the same name is not affected. discardEventsOfPipe and isEventFromPipe are updated consistently across collector, queues, and sink subtask.
Please clarify the null committerKey fallback inline. Otherwise looks good.
|
|
||
| if (enrichedEvent.getPipeName() != null | ||
| && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { | ||
| && (pendingQueue.isEventFromDroppedPipe(enrichedEvent) |
There was a problem hiding this comment.
Drop logic uses isEventFromDroppedPipe when possible, and falls back to isPipeDropped only when committerKey is null. Please document when committerKey can still be null at collection time; if that window is wide, the fallback could discard events for a recreated pipe with the same name and creationTime.
| return committerKey.getPipeName().equals(event.getPipeName()) | ||
| && committerKey.getCreationTime() == event.getCreationTime() | ||
| && committerKey.getRegionId() == event.getRegionId() | ||
| && (committerKey.getRestartTimes() < 0 || committerKey.equals(event.getCommitterKey())); |
There was a problem hiding this comment.
restartTimes < 0 skips restart comparison for drop-all-restarts behavior. Consider a named constant (e.g. DROP_ALL_RESTARTS) instead of a magic -1, and add a test: drop pipe, recreate same name, verify new events are not discarded.


Description
This PR fixes pipe event discard logic when dropping a pipe task by using
CommitterKeyinstead of only(pipeName, creationTime, regionId).Previously, queued/retry/batched events were matched only by pipe name, creation time, and region id, which could incorrectly
discard events from a restarted pipe task. This change propagates the full committer key through pending queues, sink
subtasks, batch builders, and sink implementations so discard checks can distinguish task restart generations.
Changes
CommitterKeyfor dropped pipe task tracking and event matching.This PR has:
for an unfamiliar reader.
for code coverage.
Key changed/added classes (or packages if there are too many classes) in this PR