From ab29af15cb03b6c7563b22a3bc98d29c47d37f22 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 11 Jun 2015 13:27:23 +0200 Subject: [PATCH] [FLINK-2183][runtime] fix deadlock for concurrent slot release --- .../flink/runtime/instance/SimpleSlot.java | 15 ++-- .../instance/SlotSharingGroupAssignment.java | 76 ++++++++++--------- ...askManagerFailsWithSlotSharingITCase.scala | 2 +- 3 files changed, 48 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java index 9bc977decb1ef..dbe961ae1d706 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -142,11 +142,9 @@ public void setLocality(Locality locality) { @Override public void releaseSlot() { - - // try to transition to the CANCELED state. That state marks - // that the releasing is in progress - if (markCancelled()) { - + + if (!isCanceled()) { + // kill all tasks currently running in this slot Execution exec = this.executedTask; if (exec != null && !exec.isFinished()) { @@ -159,9 +157,10 @@ public void releaseSlot() { // otherwise release through the parent shared slot if (getParent() == null) { // we have to give back the slot to the owning instance - getInstance().returnAllocatedSlot(this); - } - else { + if (markCancelled()) { + getInstance().returnAllocatedSlot(this); + } + } else { // we have to ask our parent to dispose us getParent().releaseChild(this); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java index f2b7dba994712..801e9ca6fd997 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java @@ -474,48 +474,52 @@ else if (slotsForGroup.isEmpty()) { */ void releaseSimpleSlot(SimpleSlot simpleSlot) { synchronized (lock) { - // sanity checks - if (simpleSlot.isAlive()) { - throw new IllegalStateException("slot is still alive"); - } - - // check whether the slot is already released - if (simpleSlot.markReleased()) { - - AbstractID groupID = simpleSlot.getGroupID(); - SharedSlot parent = simpleSlot.getParent(); + // try to transition to the CANCELED state. That state marks + // that the releasing is in progress + if (simpleSlot.markCancelled()) { - // if we have a group ID, then our parent slot is tracked here - if (groupID != null && !allSlots.contains(parent)) { - throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); + // sanity checks + if (simpleSlot.isAlive()) { + throw new IllegalStateException("slot is still alive"); } - int parentRemaining = parent.removeDisposedChildSlot(simpleSlot); - - if (parentRemaining > 0) { - // the parent shared slot is still alive. make sure we make it - // available again to the group of the just released slot - - if (groupID != null) { - // if we have a group ID, then our parent becomes available - // for that group again. otherwise, the slot is part of a - // co-location group and nothing becomes immediately available - - Map> slotsForJid = availableSlotsPerJid.get(groupID); + // check whether the slot is already released + if (simpleSlot.markReleased()) { - // sanity check - if (slotsForJid == null) { - throw new IllegalStateException("Trying to return a slot for group " + groupID + - " when available slots indicated that all slots were available."); - } + AbstractID groupID = simpleSlot.getGroupID(); + SharedSlot parent = simpleSlot.getParent(); - putIntoMultiMap(slotsForJid, parent.getInstance(), parent); + // if we have a group ID, then our parent slot is tracked here + if (groupID != null && !allSlots.contains(parent)) { + throw new IllegalArgumentException("Slot was not associated with this SlotSharingGroup before."); + } + + int parentRemaining = parent.removeDisposedChildSlot(simpleSlot); + + if (parentRemaining > 0) { + // the parent shared slot is still alive. make sure we make it + // available again to the group of the just released slot + + if (groupID != null) { + // if we have a group ID, then our parent becomes available + // for that group again. otherwise, the slot is part of a + // co-location group and nothing becomes immediately available + + Map> slotsForJid = availableSlotsPerJid.get(groupID); + + // sanity check + if (slotsForJid == null) { + throw new IllegalStateException("Trying to return a slot for group " + groupID + + " when available slots indicated that all slots were available."); + } + + putIntoMultiMap(slotsForJid, parent.getInstance(), parent); + } + } else { + // the parent shared slot is now empty and can be released + parent.markCancelled(); + internalDisposeEmptySharedSlot(parent); } - } - else { - // the parent shared slot is now empty and can be released - parent.markCancelled(); - internalDisposeEmptySharedSlot(parent); } } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index 39543f7db3833..e98fd98bb0d21 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -44,7 +44,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { "The JobManager" should { "handle gracefully failing task manager with slot sharing" in { - val num_tasks = 20 + val num_tasks = 100 val sender = new AbstractJobVertex("Sender") val receiver = new AbstractJobVertex("Receiver")