Skip to content

Commit

Permalink
[FLINK-22368] Deque channel after releasing on EndOfPartition
Browse files Browse the repository at this point in the history
...and don't enqueue the channel if it received EndOfPartition
previously.

Leaving a released channel enqueued may lead to
CancelTaskException which can prevent EndOfPartitionEvent
propagation and the job being stuck.
  • Loading branch information
rkhachatryan committed May 3, 2021
1 parent ab75206 commit 72cc560
Showing 1 changed file with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public class SingleInputGate extends IndexedInputGate {
@GuardedBy("inputChannelsWithData")
private final BitSet enqueuedInputChannelsWithData;

@GuardedBy("inputChannelsWithData")
private final BitSet channelsWithEndOfPartitionEvents;

@GuardedBy("inputChannelsWithData")
Expand Down Expand Up @@ -729,16 +730,24 @@ private BufferOrEvent transformEvent(
}

if (event.getClass() == EndOfPartitionEvent.class) {
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());

if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
synchronized (inputChannelsWithData) {
checkState(!channelsWithEndOfPartitionEvents.get(currentChannel.getChannelIndex()));
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
hasReceivedAllEndOfPartitionEvents =
channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels;

enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
if (inputChannelsWithData.contains(currentChannel)) {
inputChannelsWithData.getAndRemove(channel -> channel == currentChannel);
}
}
if (hasReceivedAllEndOfPartitionEvents) {
// Because of race condition between:
// 1. releasing inputChannelsWithData lock in this method and reaching this place
// 2. empty data notification that re-enqueues a channel
// we can end up with moreAvailable flag set to true, while we expect no more data.
// 2. empty data notification that re-enqueues a channel we can end up with
// moreAvailable flag set to true, while we expect no more data.
checkState(!moreAvailable || !pollNext().isPresent());
moreAvailable = false;
hasReceivedAllEndOfPartitionEvents = true;
markAvailable();
}

Expand Down Expand Up @@ -859,12 +868,6 @@ && isOutdated(
return;
}

if (channel.isReleased()) {
// when channel is closed, EndOfPartitionEvent is send and a final notification
// if EndOfPartitionEvent causes a release, we must ignore the notification
return;
}

if (!queueChannelUnsafe(channel, priority)) {
return;
}
Expand All @@ -889,13 +892,17 @@ private boolean isOutdated(int sequenceNumber, int lastSequenceNumber) {
}

/**
* Queues the channel if not already enqueued, potentially raising the priority.
* Queues the channel if not already enqueued and not received EndOfPartition, potentially
* raising the priority.
*
* @return true iff it has been enqueued/prioritized = some change to {@link
* #inputChannelsWithData} happened
*/
private boolean queueChannelUnsafe(InputChannel channel, boolean priority) {
assert Thread.holdsLock(inputChannelsWithData);
if (channelsWithEndOfPartitionEvents.get(channel.getChannelIndex())) {
return false;
}

final boolean alreadyEnqueued =
enqueuedInputChannelsWithData.get(channel.getChannelIndex());
Expand Down

0 comments on commit 72cc560

Please sign in to comment.