diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 6ee3ffb771bfa..043a659379ac0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -356,6 +356,13 @@ public Buffer getNextReceivedBuffer() { @Override public NotificationResult notifyBufferAvailable(Buffer buffer) { NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED; + // Two remote channels might call this method mutually by task thread and canceller thread concurrently. + // To avoid deadlock issue we can check the released state to return immediately before synchronizing. + // See FLINK-18595 for details. + if (isReleased.get()) { + return notificationResult; + } + try { synchronized (bufferQueue) { checkState(isWaitingForFloatingBuffers,