From 356e5646f79c864a283b0a1f58fdd8339932fd68 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Thu, 16 Jul 2020 10:22:25 +0200 Subject: [PATCH] [FLINK-18595][network] Fix the deadlock issue by task thread and canceler thread in RemoteInputChannel Assuming two remote channels as listeners in LocalBufferPool, the deadlock happens as follows 1. While the Canceler thread calling ch1#releaseAllResources, it will occupy the bufferQueue lock and try to call ch2#notifyBufferAvailable. 2. While task thread exiting to call CachedBufferStorage#close, it might release exclusive buffers for ch2. Then ch2 will occupy the bufferQueue lock and try to call ch1#notifyBufferAvailable. 3. ch1 and ch2 will both occupy self bufferQueue lock and wait for other side's bufferQueue lock to cause deadlock. Regarding the solution, we can check the released state outside of bufferQueue lock in RemoteInputChannel#notifyBufferAvailable to return immediately. --- .../io/network/partition/consumer/RemoteInputChannel.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 6ee3ffb771bfa..043a659379ac0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -356,6 +356,13 @@ public Buffer getNextReceivedBuffer() { @Override public NotificationResult notifyBufferAvailable(Buffer buffer) { NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED; + // Two remote channels might call this method mutually by task thread and canceller thread concurrently. + // To avoid deadlock issue we can check the released state to return immediately before synchronizing. + // See FLINK-18595 for details. + if (isReleased.get()) { + return notificationResult; + } + try { synchronized (bufferQueue) { checkState(isWaitingForFloatingBuffers,