Skip to content

Commit

Permalink
[hotfix][coordination] Do not schedule shared slot bulk if some slots…
Browse files Browse the repository at this point in the history
… have failed immediately.
  • Loading branch information
azagrebin committed Oct 21, 2020
1 parent 102aabe commit e98f87a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -126,7 +127,8 @@ public List<SlotExecutionVertexAssignment> allocateSlotsFor(
Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments =
allocateLogicalSlotsFromSharedSlots(sharedSlotProfileRetriever, executionsByGroup);

bulkChecker.schedulePendingRequestBulkTimeoutCheck(createBulk(executionsByGroup), allocationTimeout);
createBulk(executionsByGroup)
.ifPresent(bulk -> bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, allocationTimeout));

return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());
}
Expand Down Expand Up @@ -212,20 +214,24 @@ private ResourceProfile getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup
.reduce(ResourceProfile.ZERO, (r, e) -> r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
}

private SharingPhysicalSlotRequestBulk createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests = executions
.keySet()
.stream()
.collect(Collectors.toMap(
group -> group,
group -> sharedSlots.get(group).getPhysicalSlotResourceProfile()
));
private Optional<SharingPhysicalSlotRequestBulk> createBulk(
Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests = new HashMap<>();
for (ExecutionSlotSharingGroup group : executions.keySet()) {
SharedSlot sharedSlot = sharedSlots.get(group);
if (sharedSlot == null || sharedSlot.getSlotContextFuture().isCompletedExceptionally()) {
// there is no shared slot for this group or its physical slot has already failed
// hence there is no point to track the whole bulk
return Optional.empty();
}
pendingRequests.put(group, sharedSlot.getPhysicalSlotResourceProfile());
}
SharingPhysicalSlotRequestBulk bulk = new SharingPhysicalSlotRequestBulk(
executions,
pendingRequests,
this::cancelLogicalSlotRequest);
registerPhysicalSlotRequestBulkCallbacks(executions.keySet(), bulk);
return bulk;
return Optional.of(bulk);
}

private void registerPhysicalSlotRequestBulkCallbacks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,21 @@ public void testFailedPhysicalSlotRequestFailsLogicalSlotFuturesAndRemovesShared
assertThat(context.getSlotProvider().getRequests().keySet(), hasSize(2));
}

@Test
public void testBulkIsNotTrackedIfPhysicalSlotFailsImmediately() {
TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
AllocationContext context = AllocationContext
.newBuilder()
.addGroup(EV1)
.withBulkChecker(bulkChecker)
.completePhysicalSlotFutureExceptionally()
.build();
CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(EV1).get(0).getLogicalSlotFuture();

assertThat(logicalSlotFuture.isCompletedExceptionally(), is(true));
assertThat(bulkChecker.isBulkScheduled(), is(false));
}

@Test
public void testSlotWillBeOccupiedIndefinitelyFalse() throws ExecutionException, InterruptedException {
testSlotWillBeOccupiedIndefinitely(false);
Expand Down Expand Up @@ -201,7 +216,7 @@ private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupie
public void testReturningLogicalSlotsRemovesSharedSlot() throws Exception {
// physical slot request is completed and completes logical requests
testLogicalSlotRequestCancellationOrRelease(
false,
FutureCompletionStyle.NORMALLY,
true,
(context, assignment) -> assignment.getLogicalSlotFuture().get().releaseSlot(null));
}
Expand All @@ -210,7 +225,7 @@ public void testReturningLogicalSlotsRemovesSharedSlot() throws Exception {
public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() throws Exception {
// physical slot request is not completed and does not complete logical requests
testLogicalSlotRequestCancellationOrRelease(
true,
FutureCompletionStyle.MANUALLY,
true,
(context, assignment) -> {
context.getAllocator().cancel(assignment.getExecutionVertexId());
Expand All @@ -227,7 +242,7 @@ public void testLogicalSlotCancelsPhysicalSlotRequestAndRemovesSharedSlot() thro
public void testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestAndDoesNotRemoveSharedSlot() throws Exception {
// physical slot request is completed and completes logical requests
testLogicalSlotRequestCancellationOrRelease(
false,
FutureCompletionStyle.NORMALLY,
false,
(context, assignment) -> {
context.getAllocator().cancel(assignment.getExecutionVertexId());
Expand All @@ -236,13 +251,13 @@ public void testCompletedLogicalSlotCancelationDoesNotCancelPhysicalSlotRequestA
}

private static void testLogicalSlotRequestCancellationOrRelease(
boolean completePhysicalSlotFutureManually,
boolean cancelsPhysicalSlotRequestAndRemovesSharedSlot,
BiConsumerWithException<AllocationContext, SlotExecutionVertexAssignment, Exception> cancelOrReleaseAction) throws Exception {
FutureCompletionStyle physicalSlotFutureCompletionStyle,
boolean cancelsPhysicalSlotRequestAndRemovesSharedSlot,
BiConsumerWithException<AllocationContext, SlotExecutionVertexAssignment, Exception> cancelOrReleaseAction) throws Exception {
AllocationContext context = AllocationContext
.newBuilder()
.addGroup(EV1, EV2, EV3)
.completePhysicalSlotFutureManually(completePhysicalSlotFutureManually)
.setPhysicalSlotFutureCompletionStyle(physicalSlotFutureCompletionStyle)
.build();

List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(EV1, EV2);
Expand Down Expand Up @@ -407,10 +422,10 @@ private static class AllocationContext {
private final TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory;

private AllocationContext(
TestingPhysicalSlotProvider slotProvider,
TestingSlotSharingStrategy slotSharingStrategy,
SlotSharingExecutionSlotAllocator allocator,
TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) {
TestingPhysicalSlotProvider slotProvider,
TestingSlotSharingStrategy slotSharingStrategy,
SlotSharingExecutionSlotAllocator allocator,
TestingSharedSlotProfileRetrieverFactory slotProfileRetrieverFactory) {
this.slotProvider = slotProvider;
this.slotSharingStrategy = slotSharingStrategy;
this.allocator = allocator;
Expand Down Expand Up @@ -451,7 +466,7 @@ private static Builder newBuilder() {

private static class Builder {
private final Collection<ExecutionVertexID[]> groups = new ArrayList<>();
private boolean completePhysicalSlotFutureManually = false;
private FutureCompletionStyle physicalSlotFutureCompletionStyle = FutureCompletionStyle.NORMALLY;
private boolean completeSlotProfileFutureManually = false;
private boolean slotWillBeOccupiedIndefinitely = false;
private PhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker();
Expand All @@ -461,13 +476,19 @@ private Builder addGroup(ExecutionVertexID... group) {
return this;
}

private Builder setPhysicalSlotFutureCompletionStyle(
FutureCompletionStyle physicalSlotFutureCompletionStyle) {
this.physicalSlotFutureCompletionStyle = physicalSlotFutureCompletionStyle;
return this;
}

private Builder completePhysicalSlotFutureManually() {
completePhysicalSlotFutureManually(true);
this.physicalSlotFutureCompletionStyle = FutureCompletionStyle.MANUALLY;
return this;
}

private Builder completePhysicalSlotFutureManually(boolean value) {
this.completePhysicalSlotFutureManually = value;
private Builder completePhysicalSlotFutureExceptionally() {
this.physicalSlotFutureCompletionStyle = FutureCompletionStyle.EXCEPTIONALLY;
return this;
}

Expand All @@ -487,7 +508,8 @@ private Builder withBulkChecker(PhysicalSlotRequestBulkChecker bulkChecker) {
}

private AllocationContext build() {
TestingPhysicalSlotProvider slotProvider = new TestingPhysicalSlotProvider(completePhysicalSlotFutureManually);
TestingPhysicalSlotProvider slotProvider =
new TestingPhysicalSlotProvider(physicalSlotFutureCompletionStyle);
TestingSharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory =
new TestingSharedSlotProfileRetrieverFactory(completeSlotProfileFutureManually);
TestingSlotSharingStrategy slotSharingStrategy = TestingSlotSharingStrategy.createWithGroups(groups);
Expand All @@ -512,10 +534,10 @@ private static class TestingPhysicalSlotProvider implements PhysicalSlotProvider
private final Map<SlotRequestId, PhysicalSlotRequest> requests;
private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses;
private final Map<SlotRequestId, Throwable> cancellations;
private final boolean completePhysicalSlotFutureManually;
private final FutureCompletionStyle physicalSlotFutureCompletionStyle;

private TestingPhysicalSlotProvider(boolean completePhysicalSlotFutureManually) {
this.completePhysicalSlotFutureManually = completePhysicalSlotFutureManually;
private TestingPhysicalSlotProvider(FutureCompletionStyle physicalSlotFutureCompletionStyle) {
this.physicalSlotFutureCompletionStyle = physicalSlotFutureCompletionStyle;
this.requests = new HashMap<>();
this.responses = new HashMap<>();
this.cancellations = new HashMap<>();
Expand All @@ -527,8 +549,10 @@ public CompletableFuture<PhysicalSlotRequest.Result> allocatePhysicalSlot(Physic
requests.put(slotRequestId, physicalSlotRequest);
CompletableFuture<TestingPhysicalSlot> resultFuture = new CompletableFuture<>();
responses.put(slotRequestId, resultFuture);
if (!completePhysicalSlotFutureManually) {
if (physicalSlotFutureCompletionStyle == FutureCompletionStyle.NORMALLY) {
completePhysicalSlotFutureFor(slotRequestId, new AllocationID());
} else if (physicalSlotFutureCompletionStyle == FutureCompletionStyle.EXCEPTIONALLY) {
failPhysicalSlotFutureFor(slotRequestId, new Throwable());
}
return resultFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
}
Expand Down Expand Up @@ -645,6 +669,12 @@ private List<ExecutionSlotSharingGroup> getAskedGroups() {
}
}

private enum FutureCompletionStyle {
NORMALLY,
EXCEPTIONALLY,
MANUALLY
}

private static class TestingPhysicalSlotRequestBulkChecker implements PhysicalSlotRequestBulkChecker {
private PhysicalSlotRequestBulk bulk;
private Time timeout;
Expand All @@ -660,6 +690,10 @@ public void schedulePendingRequestBulkTimeoutCheck(PhysicalSlotRequestBulk bulk,
this.timeout = timeout;
}

private boolean isBulkScheduled() {
return bulk != null;
}

private PhysicalSlotRequestBulk getBulk() {
return bulk;
}
Expand Down

0 comments on commit e98f87a

Please sign in to comment.