Skip to content

Commit

Permalink
[FLINK-19552][coordination] Consider only available input location pr…
Browse files Browse the repository at this point in the history
…eferences for slot profile in pipelined region scheduling

The pipelined region scheduling strategy schedules regions once all their input blocking dependencies are ready.
The SlotSharingGroups of the region can include executions of other regions which are not scheduled yet including their dependencies.
Hence we should not wait other unavailable input dependencies to unblock the current region scheduling.

The new SlotSharingExecutionSlotAllocator creates the DefaultSyncPreferredLocationsRetriever
where the original InputsLocationsRetriever is wrapped with the AvailableInputsLocationsRetriever.
It makes the InputsLocationsRetriever return only completed input location futures, others are filtered out.
This allows to return completed future from DefaultSyncPreferredLocationsRetriever and make it synchronous and non-blocking.

This closes #13730.
  • Loading branch information
azagrebin committed Oct 27, 2020
1 parent 4474291 commit 4b3150d
Show file tree
Hide file tree
Showing 12 changed files with 372 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* {@link InputsLocationsRetriever} which filters out only already available input locations.
*/
class AvailableInputsLocationsRetriever implements InputsLocationsRetriever {
private final InputsLocationsRetriever inputsLocationsRetriever;

AvailableInputsLocationsRetriever(InputsLocationsRetriever inputsLocationsRetriever) {
this.inputsLocationsRetriever = inputsLocationsRetriever;
}

@Override
public Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(
ExecutionVertexID executionVertexId) {
return inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
}

@Override
public Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(ExecutionVertexID executionVertexId) {
return inputsLocationsRetriever
.getTaskManagerLocation(executionVertexId)
.filter(future -> future.isDone() && !future.isCompletedExceptionally());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Synchronous version of {@link DefaultPreferredLocationsRetriever}.
*
* <p>This class turns {@link DefaultPreferredLocationsRetriever} into {@link SyncPreferredLocationsRetriever}.
* The method {@link #getPreferredLocations(ExecutionVertexID, Set)} does not return {@link CompletableFuture}
* of preferred locations, it returns only locations which are available immediately.
* This behaviour is achieved by wrapping the original {@link InputsLocationsRetriever} with
* {@link AvailableInputsLocationsRetriever} and hence making it synchronous without blocking.
* As {@link StateLocationRetriever} is already synchronous,
* the overall location retrieval becomes synchronous without blocking.
*/
class DefaultSyncPreferredLocationsRetriever implements SyncPreferredLocationsRetriever {
private final PreferredLocationsRetriever asyncPreferredLocationsRetriever;

DefaultSyncPreferredLocationsRetriever(
StateLocationRetriever stateLocationRetriever,
InputsLocationsRetriever inputsLocationsRetriever) {
this.asyncPreferredLocationsRetriever = new DefaultPreferredLocationsRetriever(
stateLocationRetriever,
new AvailableInputsLocationsRetriever(inputsLocationsRetriever));
}

@Override
public Collection<TaskManagerLocation> getPreferredLocations(
ExecutionVertexID executionVertexId,
Set<ExecutionVertexID> producersToIgnore) {
CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
asyncPreferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
Preconditions.checkState(preferredLocationsFuture.isDone());
// it is safe to do the blocking call here
// as the underlying InputsLocationsRetriever returns only immediately available locations
return preferredLocationsFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
Expand All @@ -31,20 +30,19 @@
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Factory for {@link MergingSharedSlotProfileRetriever}.
*/
class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
private final PreferredLocationsRetriever preferredLocationsRetriever;
private final SyncPreferredLocationsRetriever preferredLocationsRetriever;

private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;

MergingSharedSlotProfileRetrieverFactory(
PreferredLocationsRetriever preferredLocationsRetriever,
SyncPreferredLocationsRetriever preferredLocationsRetriever,
Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever) {
this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever);
this.priorAllocationIdRetriever = Preconditions.checkNotNull(priorAllocationIdRetriever);
Expand Down Expand Up @@ -96,33 +94,24 @@ private MergingSharedSlotProfileRetriever(
*
* @param executionSlotSharingGroup executions sharing the slot.
* @param physicalSlotResourceProfile {@link ResourceProfile} of the slot.
* @return a future of the {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
* @return {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
*/
@Override
public CompletableFuture<SlotProfile> getSlotProfileFuture(
public SlotProfile getSlotProfile(
ExecutionSlotSharingGroup executionSlotSharingGroup,
ResourceProfile physicalSlotResourceProfile) {
Collection<AllocationID> priorAllocations = new HashSet<>();
Collection<CompletableFuture<Collection<TaskManagerLocation>>> preferredLocationsPerExecution = new ArrayList<>();
Collection<TaskManagerLocation> preferredLocations = new ArrayList<>();
for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {
priorAllocations.add(priorAllocationIdRetriever.apply(execution));
preferredLocationsPerExecution.add(preferredLocationsRetriever
.getPreferredLocations(execution, producersToIgnore));
preferredLocations.addAll(preferredLocationsRetriever.getPreferredLocations(execution, producersToIgnore));
}

CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils
.combineAll(preferredLocationsPerExecution)
.thenApply(executionPreferredLocations ->
executionPreferredLocations.stream().flatMap(Collection::stream).collect(Collectors.toList()));

return preferredLocationsFuture.thenApply(
preferredLocations ->
SlotProfile.priorAllocation(
physicalSlotResourceProfile,
physicalSlotResourceProfile,
preferredLocations,
priorAllocations,
allBulkPriorAllocationIds));
return SlotProfile.priorAllocation(
physicalSlotResourceProfile,
physicalSlotResourceProfile,
preferredLocations,
priorAllocations,
allBulkPriorAllocationIds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Computes a {@link SlotProfile} to allocate a slot for executions, sharing the slot.
Expand All @@ -35,9 +34,9 @@ interface SharedSlotProfileRetriever {
*
* @param executionSlotSharingGroup executions sharing the slot.
* @param physicalSlotResourceProfile {@link ResourceProfile} of the slot.
* @return a future of the {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
* @return {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
*/
CompletableFuture<SlotProfile> getSlotProfileFuture(
SlotProfile getSlotProfile(
ExecutionSlotSharingGroup executionSlotSharingGroup,
ResourceProfile physicalSlotResourceProfile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,11 @@ private SharedSlot getOrAllocateSharedSlot(
.computeIfAbsent(executionSlotSharingGroup, group -> {
SlotRequestId physicalSlotRequestId = new SlotRequestId();
ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);
CompletableFuture<PhysicalSlot> physicalSlotFuture = sharedSlotProfileRetriever
.getSlotProfileFuture(group, physicalSlotResourceProfile)
.thenCompose(slotProfile -> slotProvider.allocatePhysicalSlot(
new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely)))
SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
PhysicalSlotRequest physicalSlotRequest =
new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
.allocatePhysicalSlot(physicalSlotRequest)
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
return new SharedSlot(
physicalSlotRequestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContex
context.getSchedulingTopology(),
context.getLogicalSlotSharingGroups(),
context.getCoLocationGroups());
PreferredLocationsRetriever preferredLocationsRetriever =
new DefaultPreferredLocationsRetriever(context, context);
SyncPreferredLocationsRetriever preferredLocationsRetriever =
new DefaultSyncPreferredLocationsRetriever(context, context);
SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new MergingSharedSlotProfileRetrieverFactory(
preferredLocationsRetriever,
context::getPriorAllocationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.Set;

/**
* Component to retrieve the preferred locations of an execution vertex.
*/
@FunctionalInterface
public interface SyncPreferredLocationsRetriever {

/**
* Returns preferred locations of an execution vertex.
*
* @param executionVertexId id of the execution vertex
* @param producersToIgnore producer vertices to ignore when calculating input locations
* @return future of preferred locations
*/
Collection<TaskManagerLocation> getPreferredLocations(
ExecutionVertexID executionVertexId,
Set<ExecutionVertexID> producersToIgnore);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Collection;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link AvailableInputsLocationsRetriever}.
*/
public class AvailableInputsLocationsRetrieverTest extends TestLogger {
private static final ExecutionVertexID EV1 = createRandomExecutionVertexId();
private static final ExecutionVertexID EV2 = createRandomExecutionVertexId();

@Test
public void testNoInputLocation() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(false));
}

@Test
public void testNoInputLocationIfNotDone() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
originalLocationRetriever.markScheduled(EV1);
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(false));
}

@Test
public void testNoInputLocationIfFailed() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
originalLocationRetriever.failTaskManagerLocation(EV1, new Throwable());
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(false));
}

@Test
public void testInputLocationIfDone() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
originalLocationRetriever.assignTaskManagerLocation(EV1);
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(true));
}

@Test
public void testConsumedResultPartitionsProducers() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
Collection<Collection<ExecutionVertexID>> producers =
availableInputsLocationsRetriever.getConsumedResultPartitionsProducers(EV2);
assertThat(producers.size(), is(1));
Collection<ExecutionVertexID> resultProducers = producers.iterator().next();
assertThat(resultProducers, contains(EV1));
}

private static TestingInputsLocationsRetriever getOriginalLocationRetriever() {
return new TestingInputsLocationsRetriever.Builder().connectConsumerToProducer(EV2, EV1).build();
}
}

0 comments on commit 4b3150d

Please sign in to comment.