fix: include port id in materialization reader actor id to fix self-join deadlock#4985
fix: include port id in materialization reader actor id to fix self-join deadlock#4985Ma77Ball wants to merge 8 commits into
Conversation
|
@Yicong-Huang please review |
I don't think fixing the port id into the actor's name is the right way to go. port id shouldn't be part of actor name and Actor name should be unique. If we have an operator with 5 input ports, this fix would name the actor to 5 different names. The bug should be on the other lower layer: an actor could have multiple channels, and channels are connected between different ports. I think we did not differentiate channels well, and the fix is deeper than just renames, the gateway logic might also require fix. To fix it you need a deeper understanding of amber. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4985 +/- ##
=========================================
Coverage 42.72% 42.72%
+ Complexity 2185 2184 -1
=========================================
Files 1031 1031
Lines 38152 38156 +4
Branches 4004 4005 +1
=========================================
+ Hits 16302 16304 +2
Misses 20831 20831
- Partials 1019 1021 +2
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
Thanks for digging into this — the symptom analysis (self-join through materialization producing two reader threads with the same ChannelIdentity) is accurate.
However, I do agree with @Yicong-Huang. Actor ID is not the right layer to fix it:
The root issue is that ChannelIdentity only carries (fromWorkerId, toWorkerId, isControl) — no port. And NetworkInputGateway keys its channel map by ChannelIdentity, so two channels between the same actor pair collapse into one AmberFIFOChannel with a single portId slot. That's where the FIFO seq numbers and end-of-channel markers cross-route. Renaming the synthetic "from" actor per destination port sidesteps the collision in the materialization path, but it doesn't address the underlying model.
It also doesn't fix the pipelined case. When the upstream is a real worker (not a synthetic reader thread), its name isn't ours to change. Two pipelined links sharing (fromActor, toActor) but different toPortId will still hit the same AmberFIFOChannel, and the second addInputChannel call will overwrite the port assignment of the first. So Intersect and SymmetricDifference — which don't declare port dependencies and never materialize — would still hang on self-link. #2588 wouldn't really be closed.
And making one reader thread present itself as N actor IDs (one per destination port) breaks the "an actor has one name" invariant. An actor ID should identify the sender, not the (sender, destination) pair.
The proper fix is probably at the channel layer: either add toPortId to ChannelIdentity, or re-key inputChannels by (ChannelIdentity, toPortId). Either way it's a bigger change that needs careful work on the gateway, FIFO tracking, and EOC routing.
Given the scope around and engine-knowledge required for this fix, I think it would be better for @Ma77Ball to close this PR, and I can work on a more fundamental fix later.
What changes were proposed in this PR?
Fixes the Difference operator hanging when one upstream operator (e.g., a single CSV) is wired to both of its input ports.
The bug
InputPortMaterializationReaderThreadper upstream URI.(uri, workerActorId)only.ChannelIdentity→ FIFO sequence numbers and end-of-channel markers cross-routed → one port never drains → Difference hangs.The fix
PortIdentityinto the actor name:MATERIALIZATION_READER_<uri>_port<n>[i]_<workerActorId>.toPortIdthrough the three callers ofgetFromActorIdForInputPortStorage:ResourceAllocator→globalPortId.portId.AssignPortHandler→msg.portId.InputManager→InputPortMaterializationReaderThread(new ctor field)Any related issues, documentation, or discussions?
Closes: #2588
How was this PR tested?
VirtualIdentityUtilsSpec— checks the new ID format and asserts distinct IDs for the same(uri, worker)but different port IDs.ExpansionGreedyScheduleGeneratorSpec— buildscsv → differencewith both inputs from the same csv; assertslevelSets.size > 1, proving materialization happens and the schedule isn't a deadlocked single-level region.Was this PR authored or co-authored using generative AI tooling?
Co-authored with: Claude Opus 4.7 in compliance with ASF