Skip to content

Commit

Permalink
[FLINK-19552][coordination] Consider only available input location pr…
Browse files Browse the repository at this point in the history
…eferences for slot profile in pipelined region scheduling
  • Loading branch information
azagrebin committed Oct 21, 2020
1 parent 9a4def1 commit d4a0257
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* {@link InputsLocationsRetriever} which filters out only already available input locations.
*/
class AvailableInputsLocationsRetriever implements InputsLocationsRetriever {
private final InputsLocationsRetriever inputsLocationsRetriever;

AvailableInputsLocationsRetriever(InputsLocationsRetriever inputsLocationsRetriever) {
this.inputsLocationsRetriever = inputsLocationsRetriever;
}

@Override
public Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(
ExecutionVertexID executionVertexId) {
return inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
}

@Override
public Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(ExecutionVertexID executionVertexId) {
return inputsLocationsRetriever
.getTaskManagerLocation(executionVertexId)
.filter(future -> future.isDone() && !future.isCompletedExceptionally());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Synchronous version of {@link DefaultPreferredLocationsRetriever}.
*
* <p>This class turns {@link DefaultPreferredLocationsRetriever} into {@link SyncPreferredLocationsRetriever}.
* The method {@link #getPreferredLocations(ExecutionVertexID, Set)} does not return {@link CompletableFuture}
* of preferred locations, it returns only locations which are available immediately.
* This behaviour is achieved by wrapping the original {@link InputsLocationsRetriever} with
* {@link AvailableInputsLocationsRetriever} and hence making it synchronous without blocking.
* As {@link StateLocationRetriever} is already synchronous,
* the overall location retrieval becomes synchronous without blocking.
*/
class DefaultSyncPreferredLocationsRetriever implements SyncPreferredLocationsRetriever {
private final PreferredLocationsRetriever asyncPreferredLocationsRetriever;

DefaultSyncPreferredLocationsRetriever(
StateLocationRetriever stateLocationRetriever,
InputsLocationsRetriever inputsLocationsRetriever) {
this.asyncPreferredLocationsRetriever = new DefaultPreferredLocationsRetriever(
stateLocationRetriever,
new AvailableInputsLocationsRetriever(inputsLocationsRetriever));
}

@Override
public Collection<TaskManagerLocation> getPreferredLocations(
ExecutionVertexID executionVertexId,
Set<ExecutionVertexID> producersToIgnore) {
CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
asyncPreferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore);
Preconditions.checkState(preferredLocationsFuture.isDone());
// it is safe to do the blocking call here
// as the underlying InputsLocationsRetriever returns only immediately available locations
return preferredLocationsFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
Expand All @@ -31,20 +30,19 @@
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Factory for {@link MergingSharedSlotProfileRetriever}.
*/
class MergingSharedSlotProfileRetrieverFactory implements SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory {
private final PreferredLocationsRetriever preferredLocationsRetriever;
private final SyncPreferredLocationsRetriever preferredLocationsRetriever;

private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever;

MergingSharedSlotProfileRetrieverFactory(
PreferredLocationsRetriever preferredLocationsRetriever,
SyncPreferredLocationsRetriever preferredLocationsRetriever,
Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever) {
this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever);
this.priorAllocationIdRetriever = Preconditions.checkNotNull(priorAllocationIdRetriever);
Expand Down Expand Up @@ -96,33 +94,24 @@ private MergingSharedSlotProfileRetriever(
*
* @param executionSlotSharingGroup executions sharing the slot.
* @param physicalSlotResourceProfile {@link ResourceProfile} of the slot.
* @return a future of the {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
* @return {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
*/
@Override
public CompletableFuture<SlotProfile> getSlotProfileFuture(
public SlotProfile getSlotProfile(
ExecutionSlotSharingGroup executionSlotSharingGroup,
ResourceProfile physicalSlotResourceProfile) {
Collection<AllocationID> priorAllocations = new HashSet<>();
Collection<CompletableFuture<Collection<TaskManagerLocation>>> preferredLocationsPerExecution = new ArrayList<>();
Collection<TaskManagerLocation> preferredLocations = new ArrayList<>();
for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {
priorAllocations.add(priorAllocationIdRetriever.apply(execution));
preferredLocationsPerExecution.add(preferredLocationsRetriever
.getPreferredLocations(execution, producersToIgnore));
preferredLocations.addAll(preferredLocationsRetriever.getPreferredLocations(execution, producersToIgnore));
}

CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils
.combineAll(preferredLocationsPerExecution)
.thenApply(executionPreferredLocations ->
executionPreferredLocations.stream().flatMap(Collection::stream).collect(Collectors.toList()));

return preferredLocationsFuture.thenApply(
preferredLocations ->
SlotProfile.priorAllocation(
physicalSlotResourceProfile,
physicalSlotResourceProfile,
preferredLocations,
priorAllocations,
allBulkPriorAllocationIds));
return SlotProfile.priorAllocation(
physicalSlotResourceProfile,
physicalSlotResourceProfile,
preferredLocations,
priorAllocations,
allBulkPriorAllocationIds);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Computes a {@link SlotProfile} to allocate a slot for executions, sharing the slot.
Expand All @@ -35,9 +34,9 @@ interface SharedSlotProfileRetriever {
*
* @param executionSlotSharingGroup executions sharing the slot.
* @param physicalSlotResourceProfile {@link ResourceProfile} of the slot.
* @return a future of the {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
* @return {@link SlotProfile} to allocate for the {@code executionSlotSharingGroup}.
*/
CompletableFuture<SlotProfile> getSlotProfileFuture(
SlotProfile getSlotProfile(
ExecutionSlotSharingGroup executionSlotSharingGroup,
ResourceProfile physicalSlotResourceProfile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,11 @@ private SharedSlot getOrAllocateSharedSlot(
.computeIfAbsent(executionSlotSharingGroup, group -> {
SlotRequestId physicalSlotRequestId = new SlotRequestId();
ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);
CompletableFuture<PhysicalSlot> physicalSlotFuture = sharedSlotProfileRetriever
.getSlotProfileFuture(group, physicalSlotResourceProfile)
.thenCompose(slotProfile -> slotProvider.allocatePhysicalSlot(
new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely)))
SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
PhysicalSlotRequest physicalSlotRequest =
new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
.allocatePhysicalSlot(physicalSlotRequest)
.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
return new SharedSlot(
physicalSlotRequestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContex
context.getSchedulingTopology(),
context.getLogicalSlotSharingGroups(),
context.getCoLocationGroups());
PreferredLocationsRetriever preferredLocationsRetriever =
new DefaultPreferredLocationsRetriever(context, context);
SyncPreferredLocationsRetriever preferredLocationsRetriever =
new DefaultSyncPreferredLocationsRetriever(context, context);
SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new MergingSharedSlotProfileRetrieverFactory(
preferredLocationsRetriever,
context::getPriorAllocationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.Set;

/**
* Component to retrieve the preferred locations of an execution vertex.
*/
@FunctionalInterface
public interface SyncPreferredLocationsRetriever {

/**
* Returns preferred locations of an execution vertex.
*
* @param executionVertexId id of the execution vertex
* @param producersToIgnore producer vertices to ignore when calculating input locations
* @return future of preferred locations
*/
Collection<TaskManagerLocation> getPreferredLocations(
ExecutionVertexID executionVertexId,
Set<ExecutionVertexID> producersToIgnore);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Collection;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

/**
* Tests for {@link AvailableInputsLocationsRetriever}.
*/
public class AvailableInputsLocationsRetrieverTest extends TestLogger {
private static final ExecutionVertexID EV1 = createRandomExecutionVertexId();
private static final ExecutionVertexID EV2 = createRandomExecutionVertexId();

@Test
public void testNoInputLocation() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(false));
}

@Test
public void testNoInputLocationIfNotDone() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
originalLocationRetriever.markScheduled(EV1);
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(false));
}

@Test
public void testNoInputLocationIfFailed() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
originalLocationRetriever.failTaskManagerLocation(EV1, new Throwable());
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(false));
}

@Test
public void testInputLocationIfDone() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
originalLocationRetriever.assignTaskManagerLocation(EV1);
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
assertThat(availableInputsLocationsRetriever.getTaskManagerLocation(EV1).isPresent(), is(true));
}

@Test
public void testConsumedResultPartitionsProducers() {
TestingInputsLocationsRetriever originalLocationRetriever = getOriginalLocationRetriever();
InputsLocationsRetriever availableInputsLocationsRetriever =
new AvailableInputsLocationsRetriever(originalLocationRetriever);
Collection<Collection<ExecutionVertexID>> producers =
availableInputsLocationsRetriever.getConsumedResultPartitionsProducers(EV2);
assertThat(producers.size(), is(1));
Collection<ExecutionVertexID> resultProducers = producers.iterator().next();
assertThat(resultProducers.size(), is(1));
assertThat(resultProducers.iterator().next(), is(EV1));
}

private static TestingInputsLocationsRetriever getOriginalLocationRetriever() {
return new TestingInputsLocationsRetriever.Builder().connectConsumerToProducer(EV2, EV1).build();
}
}

0 comments on commit d4a0257

Please sign in to comment.