Skip to content

Commit

Permalink
[FLINK-9917][JM] Remove superfluous lock from SlotSharingManager
Browse files Browse the repository at this point in the history
The SlotSharingManager is designed to be used by a single thread. Therefore,
it is the responsibility of the caller to make sure that there is only a single
thread at any given time accesssing this component. Consequently, the component
does not need to be synchronized.
  • Loading branch information
tillrohrmann committed Aug 16, 2018
1 parent 8ba2626 commit 3b3b9a8
Showing 1 changed file with 32 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.util.AbstractCollection;
import java.util.Collection;
Expand Down Expand Up @@ -82,9 +81,6 @@ public class SlotSharingManager {

private static final Logger LOG = LoggerFactory.getLogger(SlotSharingManager.class);

/** Lock for the internal data structures. */
private final Object lock = new Object();

private final SlotSharingGroupId slotSharingGroupId;

/** Actions to release allocated slots after a complete multi task slot hierarchy has been released. */
Expand All @@ -96,11 +92,9 @@ public class SlotSharingManager {
private final Map<SlotRequestId, TaskSlot> allTaskSlots;

/** Root nodes which have not been completed because the allocated slot is still pending. */
@GuardedBy("lock")
private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;

/** Root nodes which have been completed (the underlying allocated slot has been assigned). */
@GuardedBy("lock")
private final Map<TaskManagerLocation, Set<MultiTaskSlot>> resolvedRootSlots;

SlotSharingManager(
Expand Down Expand Up @@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot(

allTaskSlots.put(slotRequestId, rootMultiTaskSlot);

synchronized (lock) {
unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
}
unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);

// add the root node to the set of resolved root nodes once the SlotContext future has
// been completed and we know the slot's TaskManagerLocation
slotContextFuture.whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (slotContext != null) {
synchronized (lock) {
final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);

if (resolvedRootNode != null) {
LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
if (resolvedRootNode != null) {
LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());

final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
slotContext.getTaskManagerLocation(),
taskManagerLocation -> new HashSet<>(4));
final Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
slotContext.getTaskManagerLocation(),
taskManagerLocation -> new HashSet<>(4));

innerCollection.add(resolvedRootNode);
}
innerCollection.add(resolvedRootNode);
}
} else {
rootMultiTaskSlot.release(throwable);
Expand All @@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot(
*/
@Nullable
MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy matcher, SlotProfile slotProfile) {
synchronized (lock) {
Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
return matcher.findMatchWithLocality(
slotProfile,
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
MultiTaskSlotLocality::of);
}
Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues = this.resolvedRootSlots.values();
return matcher.findMatchWithLocality(
slotProfile,
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
(MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(),
(MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId),
MultiTaskSlotLocality::of);
}

/**
Expand All @@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy
*/
@Nullable
MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
synchronized (lock) {
for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
if (!multiTaskSlot.contains(groupId)) {
return multiTaskSlot;
}
for (MultiTaskSlot multiTaskSlot : unresolvedRootSlots.values()) {
if (!multiTaskSlot.contains(groupId)) {
return multiTaskSlot;
}
}

Expand All @@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
public String toString() {
final StringBuilder builder = new StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');

synchronized (lock) {
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
builder.append("\tall=").append(allTaskSlots).append('\n');
}
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
builder.append("\tall=").append(allTaskSlots).append('\n');

return builder.append('}').toString();
}
Expand Down Expand Up @@ -479,26 +463,20 @@ public void release(Throwable cause) {
parent.releaseChild(getGroupId());
} else if (allTaskSlots.remove(getSlotRequestId()) != null) {
// we are the root node --> remove the root node from the list of task slots
final MultiTaskSlot unresolvedRootSlot = unresolvedRootSlots.remove(getSlotRequestId());

if (!slotContextFuture.isDone() || slotContextFuture.isCompletedExceptionally()) {
synchronized (lock) {
// the root node should still be unresolved
unresolvedRootSlots.remove(getSlotRequestId());
}
} else {
if (unresolvedRootSlot == null) {
// the root node should be resolved --> we can access the slot context
final SlotContext slotContext = slotContextFuture.getNow(null);

if (slotContext != null) {
synchronized (lock) {
final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
final Set<MultiTaskSlot> multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());

if (multiTaskSlots != null) {
multiTaskSlots.remove(this);
if (multiTaskSlots != null) {
multiTaskSlots.remove(this);

if (multiTaskSlots.isEmpty()) {
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
}
if (multiTaskSlots.isEmpty()) {
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
}
}
}
Expand Down Expand Up @@ -637,9 +615,7 @@ public Collection<MultiTaskSlot> getResolvedRootSlots() {

@VisibleForTesting
Collection<MultiTaskSlot> getUnresolvedRootSlots() {
synchronized (lock) {
return unresolvedRootSlots.values();
}
return unresolvedRootSlots.values();
}

/**
Expand All @@ -649,19 +625,15 @@ private final class ResolvedRootSlotValues extends AbstractCollection<MultiTaskS

@Override
public Iterator<MultiTaskSlot> iterator() {
synchronized (lock) {
return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
}
return new ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
}

@Override
public int size() {
int numberResolvedMultiTaskSlots = 0;

synchronized (lock) {
for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
numberResolvedMultiTaskSlots += multiTaskSlots.size();
}
for (Set<MultiTaskSlot> multiTaskSlots : resolvedRootSlots.values()) {
numberResolvedMultiTaskSlots += multiTaskSlots.size();
}

return numberResolvedMultiTaskSlots;
Expand Down

0 comments on commit 3b3b9a8

Please sign in to comment.