diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index f18d0d8cfc731..d826773520988 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -24,7 +24,6 @@ import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; -import org.apache.flink.mesos.scheduler.LaunchableTask; import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; import org.apache.flink.mesos.scheduler.TaskMonitor; import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; @@ -79,6 +78,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -124,6 +124,8 @@ public class MesosResourceManager extends ResourceManager slotsPerWorker; + /** Mesos scheduler driver. */ private SchedulerDriver schedulerDriver; @@ -191,6 +193,9 @@ public MesosResourceManager( this.workersInNew = new HashMap<>(8); this.workersInLaunch = new HashMap<>(8); this.workersBeingReturned = new HashMap<>(8); + + final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters(); + this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots()); } protected ActorRef createSelfActor() { @@ -352,7 +357,7 @@ private void recoverWorkers(final List tasksFromPreviou switch(worker.state()) { case Launched: workersInLaunch.put(extractResourceID(worker.taskID()), worker); - final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile()); + final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get())); break; case Released: @@ -426,7 +431,7 @@ protected void internalDeregisterApplication( } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection startNewWorker(ResourceProfile resourceProfile) { LOG.info("Starting a new worker."); try { // generate new workers into persistent state and launch associated actors @@ -434,7 +439,7 @@ public void startNewWorker(ResourceProfile resourceProfile) { workerStore.putWorker(worker); workersInNew.put(extractResourceID(worker.taskID()), worker); - LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), resourceProfile); + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs()); @@ -443,9 +448,12 @@ public void startNewWorker(ResourceProfile resourceProfile) { taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor); // tell the launch coordinator to launch the new tasks - launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor); + launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor); + + return slotsPerWorker; } catch (Exception ex) { onFatalError(new ResourceManagerException("Unable to request new workers.", ex)); + return Collections.emptyList(); } } @@ -691,36 +699,13 @@ private CompletableFuture stopActor(@Nullable final ActorRef actorRef, /** * Creates a launchable task for Fenzo to process. */ - private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) { - - // create the specific TM parameters from the resource profile and some defaults - MesosTaskManagerParameters params = new MesosTaskManagerParameters( - resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(), - taskManagerParameters.gpus(), - taskManagerParameters.containerType(), - taskManagerParameters.containerImageName(), - new ContaineredTaskManagerParameters( - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(), - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(), - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(), - 1, - new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())), - taskManagerParameters.containerVolumes(), - taskManagerParameters.dockerParameters(), - taskManagerParameters.dockerForcePullImage(), - taskManagerParameters.constraints(), - taskManagerParameters.command(), - taskManagerParameters.bootstrapCommand(), - taskManagerParameters.getTaskManagerHostname(), - taskManagerParameters.uris() - ); - - LOG.debug("LaunchableMesosWorker parameters: {}", params); + private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) { + LOG.debug("LaunchableMesosWorker parameters: {}", taskManagerParameters); LaunchableMesosWorker launchable = new LaunchableMesosWorker( artifactServer, - params, + taskManagerParameters, taskManagerContainerSpec, taskID, mesosConfig); diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index e21f0fc33748c..5163724ed6dd8 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -242,7 +242,7 @@ static class Context implements AutoCloseable { TestingMesosResourceManager resourceManager; // domain objects for test purposes - final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1); + final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN; Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build(); public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index a89b9f92b17c7..5b133e7b624fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -50,6 +50,9 @@ public class ResourceProfile implements Serializable, Comparable startNewWorker(ResourceProfile resourceProfile); /** * Callback when a worker was started. @@ -1051,9 +1053,9 @@ public void releaseResource(InstanceID instanceId, Exception cause) { } @Override - public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + public Collection allocateResource(ResourceProfile resourceProfile) { validateRunsInMainThread(); - startNewWorker(resourceProfile); + return startNewWorker(resourceProfile); } @Override @@ -1176,8 +1178,16 @@ public CompletableFuture retrievePayload(ResourceID resourceID) { // Resource Management // ------------------------------------------------------------------------ - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // ------------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------------ + + protected static Collection createSlotsPerWorker(int numSlots) { + return Collections.nCopies(numSlots, ResourceProfile.ANY); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 420b89f39f2f1..064c2d361d28e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -32,6 +32,9 @@ import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; + /** * A standalone implementation of the resource manager. Used when the system is started in * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. @@ -74,7 +77,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection startNewWorker(ResourceProfile resourceProfile) { + return Collections.emptyList(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java index 17cf8c7907ecc..a8f212fe6d4f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java @@ -25,10 +25,14 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; +/** + * Class representing a pending slot request in the {@link SlotManager}. + */ public class PendingSlotRequest { private final SlotRequest slotRequest; @@ -36,11 +40,16 @@ public class PendingSlotRequest { @Nullable private CompletableFuture requestFuture; + @Nullable + private PendingTaskManagerSlot pendingTaskManagerSlot; + /** Timestamp when this pending slot request has been created. */ private final long creationTimestamp; public PendingSlotRequest(SlotRequest slotRequest) { this.slotRequest = Preconditions.checkNotNull(slotRequest); + this.requestFuture = null; + this.pendingTaskManagerSlot = null; creationTimestamp = System.currentTimeMillis(); } @@ -78,4 +87,18 @@ public void setRequestFuture(@Nullable CompletableFuture requestFut public CompletableFuture getRequestFuture() { return requestFuture; } + + @Nullable + public PendingTaskManagerSlot getAssignedPendingTaskManagerSlot() { + return pendingTaskManagerSlot; + } + + public void assignPendingTaskManagerSlot(@Nonnull PendingTaskManagerSlot pendingTaskManagerSlotToAssign) { + Preconditions.checkState(pendingTaskManagerSlot == null); + this.pendingTaskManagerSlot = pendingTaskManagerSlotToAssign; + } + + public void unassignPendingTaskManagerSlot() { + this.pendingTaskManagerSlot = null; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java new file mode 100644 index 0000000000000..ed207e963e33c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Represents a pending task manager slot in the {@link SlotManager}. + */ +public class PendingTaskManagerSlot { + + private final TaskManagerSlotId taskManagerSlotId = TaskManagerSlotId.generate(); + + private final ResourceProfile resourceProfile; + + @Nullable + private PendingSlotRequest pendingSlotRequest; + + public PendingTaskManagerSlot(ResourceProfile resourceProfile) { + this.resourceProfile = resourceProfile; + } + + public TaskManagerSlotId getTaskManagerSlotId() { + return taskManagerSlotId; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + public void assignPendingSlotRequest(@Nonnull PendingSlotRequest pendingSlotRequestToAssign) { + Preconditions.checkState(pendingSlotRequest == null); + pendingSlotRequest = pendingSlotRequestToAssign; + } + + public void unassignPendingSlotRequest() { + pendingSlotRequest = null; + } + + @Nullable + public PendingSlotRequest getAssignedPendingSlotRequest() { + return pendingSlotRequest; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java index 84e7c4e785d48..adf8f13db57ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import java.util.Collection; + /** * Resource related actions which the {@link SlotManager} can perform. */ @@ -41,9 +43,10 @@ public interface ResourceActions { * Requests to allocate a resource with the given {@link ResourceProfile}. * * @param resourceProfile for the to be allocated resource + * @return Collection of {@link ResourceProfile} describing the allocated slots * @throws ResourceManagerException if the resource cannot be allocated */ - void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException; + Collection allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException; /** * Notifies that an allocation failure has occurred. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index bab56609a1b77..2ef2b2fefcc36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -43,14 +43,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -99,6 +102,8 @@ public class SlotManager implements AutoCloseable { /** Map of pending/unfulfilled slot allocation requests. */ private final HashMap pendingSlotRequests; + private final HashMap pendingSlots; + /** ResourceManager's id. */ private ResourceManagerId resourceManagerId; @@ -130,6 +135,7 @@ public SlotManager( taskManagerRegistrations = new HashMap<>(4); fulfilledSlotRequests = new HashMap<>(16); pendingSlotRequests = new HashMap<>(16); + pendingSlots = new HashMap<>(16); resourceManagerId = null; resourceActions = null; @@ -168,8 +174,13 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) { } } - public int getNumberPendingSlotRequests() { - return pendingSlotRequests.size(); + public int getNumberPendingTaskManagerSlots() { + return pendingSlots.size(); + } + + @VisibleForTesting + int getNumberAssignedPendingTaskManagerSlots() { + return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count(); } // --------------------------------------------------------------------------------------------- @@ -530,14 +541,50 @@ private void registerSlot( removeSlot(slotId); } - TaskManagerSlot slot = new TaskManagerSlot( + final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection); + + final PendingTaskManagerSlot pendingTaskManagerSlot; + + if (allocationId == null) { + pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile); + } else { + pendingTaskManagerSlot = null; + } + + if (pendingTaskManagerSlot == null) { + updateSlot(slotId, allocationId, jobId); + } else { + pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId()); + final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest(); + + if (assignedPendingSlotRequest == null) { + handleFreeSlot(slot); + } else { + assignedPendingSlotRequest.unassignPendingTaskManagerSlot(); + allocateSlot(slot, assignedPendingSlotRequest); + } + } + } + + @Nonnull + private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { + final TaskManagerSlot slot = new TaskManagerSlot( slotId, resourceProfile, taskManagerConnection); - slots.put(slotId, slot); + return slot; + } - updateSlot(slotId, allocationId, jobId); + @Nullable + private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) { + return pendingTaskManagerSlot; + } + } + + return null; } /** @@ -595,7 +642,11 @@ private void updateSlotState( slot.updateAllocation(allocationId, jobId); // remove the pending request if any as it has been assigned - pendingSlotRequests.remove(allocationId); + final PendingSlotRequest actualPendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (actualPendingSlotRequest != null) { + cancelPendingSlotRequest(actualPendingSlotRequest); + } // this will try to find a new slot for the request rejectPendingSlotRequest( @@ -650,13 +701,54 @@ private void updateSlotState( * @throws ResourceManagerException if the resource manager cannot allocate more resource */ private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { - TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile()); + final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); + TaskManagerSlot taskManagerSlot = findMatchingSlot(resourceProfile); if (taskManagerSlot != null) { allocateSlot(taskManagerSlot, pendingSlotRequest); } else { - resourceActions.allocateResource(pendingSlotRequest.getResourceProfile()); + Optional pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile); + + if (!pendingTaskManagerSlotOptional.isPresent()) { + pendingTaskManagerSlotOptional = allocateResource(resourceProfile); + } + + pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)); + } + } + + private Optional findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (pendingTaskManagerSlot.getAssignedPendingSlotRequest() == null && pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile)) { + return Optional.of(pendingTaskManagerSlot); + } } + + return Optional.empty(); + } + + private Optional allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + final Collection requestedSlots = resourceActions.allocateResource(resourceProfile); + + if (requestedSlots.isEmpty()) { + return Optional.empty(); + } else { + final Iterator slotIterator = requestedSlots.iterator(); + final PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); + pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); + + while (slotIterator.hasNext()) { + final PendingTaskManagerSlot additionalPendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); + pendingSlots.put(additionalPendingTaskManagerSlot.getTaskManagerSlotId(), additionalPendingTaskManagerSlot); + } + + return Optional.of(pendingTaskManagerSlot); + } + } + + private void assignPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) { + pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); + pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot); } /** @@ -680,6 +772,8 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); pendingSlotRequest.setRequestFuture(completableFuture); + returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); if (taskManagerRegistration == null) { @@ -733,6 +827,14 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest pendingSlotRequest) { + final PendingTaskManagerSlot pendingTaskManagerSlot = pendingSlotRequest.getAssignedPendingTaskManagerSlot(); + if (pendingTaskManagerSlot != null) { + pendingTaskManagerSlot.unassignPendingSlotRequest(); + pendingSlotRequest.unassignPendingTaskManagerSlot(); + } + } + /** * Handles a free slot. It first tries to find a pending slot request which can be fulfilled. * If there is no such request, then it will add the slot to the set of free slots. @@ -886,6 +988,8 @@ private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exc private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { CompletableFuture request = pendingSlotRequest.getRequestFuture(); + returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); + if (null != request) { request.cancel(false); } @@ -911,9 +1015,10 @@ private void checkTaskManagerTimeouts() { } // second we trigger the release resource callback which can decide upon the resource release + final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout."); for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) { LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); - resourceActions.releaseResource(timedOutTaskManagerId, new FlinkException("TaskExecutor exceeded the idle timeout.")); + resourceActions.releaseResource(timedOutTaskManagerId, cause); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java new file mode 100644 index 0000000000000..3084b3e8ed94b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; +import org.apache.flink.util.AbstractID; + +/** + * Id of {@link TaskManagerSlot} and {@link PendingTaskManagerSlot}. + */ +public class TaskManagerSlotId extends AbstractID { + + private static final long serialVersionUID = -4024240625523472071L; + + private TaskManagerSlotId() {} + + public static TaskManagerSlotId generate() { + return new TaskManagerSlotId(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 0e98e44ab0d8e..c46d800bc9534 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -246,7 +246,7 @@ public static TaskManagerServices fromConfiguration( final List resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots()); for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) { - resourceProfiles.add(new ResourceProfile(1.0, 42)); + resourceProfiles.add(ResourceProfile.ANY); } final TimerService timerService = new TimerService<>( @@ -259,7 +259,6 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 0b56231d4a4e2..e8207019a32ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -32,6 +32,9 @@ import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; + /** * Simple {@link ResourceManager} implementation for testing purposes. */ @@ -71,8 +74,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public void startNewWorker(ResourceProfile resourceProfile) { - // noop + public Collection startNewWorker(ResourceProfile resourceProfile) { + return Collections.emptyList(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 8a7f733a3fa96..33a696a1d242e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -54,7 +55,9 @@ import javax.annotation.Nonnull; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,6 +70,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -84,9 +88,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -103,9 +105,9 @@ public class SlotManagerTest extends TestLogger { @Test public void testTaskManagerRegistration() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -135,14 +137,12 @@ public void testTaskManagerUnregistration() throws Exception { final ResourceActions resourceManagerActions = mock(ResourceActions.class); final JobID jobId = new JobID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + assertThat(tuple5.f4, is(equalTo(resourceManagerId))); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -202,13 +202,16 @@ public void testSlotRequestWithoutFreeSlots() throws Exception { resourceProfile, "localhost"); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + CompletableFuture allocateResourceFuture = new CompletableFuture<>(); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(allocateResourceFuture::complete) + .build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerSlotRequest(slotRequest); - verify(resourceManagerActions).allocateResource(eq(resourceProfile)); + assertThat(allocateResourceFuture.get(), is(equalTo(resourceProfile))); } } @@ -225,8 +228,11 @@ public void testSlotRequestWithResourceAllocationFailure() throws Exception { resourceProfile, "localhost"); - ResourceActions resourceManagerActions = mock(ResourceActions.class); - doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class)); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(value -> { + throw new ResourceManagerException("Test exception"); + }) + .build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { @@ -257,19 +263,17 @@ public void testSlotRequestWithFreeSlot() throws Exception { resourceProfile, targetAddress); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - + final CompletableFuture> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -282,7 +286,7 @@ public void testSlotRequestWithFreeSlot() throws Exception { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -302,14 +306,9 @@ public void testUnregisterPendingSlotRequest() throws Exception { final SlotID slotId = new SlotID(resourceID, 0); final AllocationID allocationId = new AllocationID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture<>()) + .createTestingTaskExecutorGateway(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); @@ -357,17 +356,19 @@ public void testFulfillingPendingSlotRequest() throws Exception { resourceProfile, targetAddress); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger numberAllocateResourceCalls = new AtomicInteger(0); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet()) + .build(); + final CompletableFuture> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -378,13 +379,13 @@ public void testFulfillingPendingSlotRequest() throws Exception { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); - verify(resourceManagerActions, times(1)).allocateResource(eq(resourceProfile)); + assertThat(numberAllocateResourceCalls.get(), is(1)); slotManager.registerTaskManager( taskExecutorConnection, slotReport); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -444,7 +445,10 @@ public void testFreeSlot() throws Exception { @Test public void testDuplicatePendingSlotRequest() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger numberAllocateResourceFunctionCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> numberAllocateResourceFunctionCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); @@ -458,7 +462,7 @@ public void testDuplicatePendingSlotRequest() throws Exception { // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, times(1)).allocateResource(any(ResourceProfile.class)); + assertThat(numberAllocateResourceFunctionCalls.get(), is(1)); } /** @@ -497,21 +501,17 @@ public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception { @Test public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); @@ -534,7 +534,7 @@ public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -544,21 +544,17 @@ public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex @Test public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -588,7 +584,7 @@ public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exc // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -624,7 +620,7 @@ public void testReceivingUnknownSlotReport() throws Exception { @Test public void testUpdateSlotReport() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -678,13 +674,16 @@ public void testUpdateSlotReport() throws Exception { */ @Test public void testTaskManagerTimeout() throws Exception { - final long tmTimeout = 500L; + final long tmTimeout = 10L; - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture releaseFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID)) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ResourceID resourceID = ResourceID.generate(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); final SlotID slotId = new SlotID(resourceID, 0); @@ -702,15 +701,9 @@ public void testTaskManagerTimeout() throws Exception { slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions); - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - slotManager.registerTaskManager(taskManagerConnection, slotReport); - } - }); + mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport)); - verify(resourceManagerActions, timeout(100L * tmTimeout).times(1)) - .releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); + assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -723,7 +716,10 @@ public void run() { public void testSlotRequestTimeout() throws Exception { final long allocationTimeout = 50L; - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture> failedAllocationFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setNotifyAllocationFailureConsumer(tuple3 -> failedAllocationFuture.complete(Tuple2.of(tuple3.f0, tuple3.f1))) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -743,21 +739,15 @@ public void testSlotRequestTimeout() throws Exception { final AtomicReference atomicException = new AtomicReference<>(null); - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - try { - assertTrue(slotManager.registerSlotRequest(slotRequest)); - } catch (Exception e) { - atomicException.compareAndSet(null, e); - } + mainThreadExecutor.execute(() -> { + try { + assertTrue(slotManager.registerSlotRequest(slotRequest)); + } catch (Exception e) { + atomicException.compareAndSet(null, e); } }); - verify(resourceManagerActions, timeout(100L * allocationTimeout).times(1)).notifyAllocationFailure( - eq(jobId), - eq(allocationId), - any(TimeoutException.class)); + assertThat(failedAllocationFuture.get(), is(equalTo(Tuple2.of(jobId, allocationId)))); if (atomicException.get() != null) { throw atomicException.get(); @@ -851,7 +841,7 @@ public void testTaskManagerSlotRequestTimeoutHandling() throws Exception { public void testSlotReportWhileActiveSlotRequest() throws Exception { final long verifyTimeout = 10000L; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -966,10 +956,12 @@ public void testSlotReportWhileActiveSlotRequest() throws Exception { @Test public void testTimeoutForUnusedTaskManager() throws Exception { final long taskManagerTimeout = 50L; - final long verifyTimeout = taskManagerTimeout * 10L; + final CompletableFuture releasedResourceFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor(); final ResourceID resourceId = ResourceID.generate(); @@ -979,14 +971,13 @@ public void testTimeoutForUnusedTaskManager() throws Exception { final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final CompletableFuture requestedSlotFuture = new CompletableFuture<>(); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestedSlotFuture.complete(tuple5.f0); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -1015,17 +1006,9 @@ public void testTimeoutForUnusedTaskManager() throws Exception { } }, mainThreadExecutor) - .thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); + .thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); - ArgumentCaptor slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class); - - verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot( - slotIdArgumentCaptor.capture(), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class)); + final SlotID slotId = requestedSlotFuture.get(); CompletableFuture idleFuture = CompletableFuture.supplyAsync( () -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()), @@ -1034,8 +1017,6 @@ public void testTimeoutForUnusedTaskManager() throws Exception { // check that the TaskManager is not idle assertFalse(idleFuture.get()); - final SlotID slotId = slotIdArgumentCaptor.getValue(); - CompletableFuture slotFuture = CompletableFuture.supplyAsync( () -> slotManager.getSlot(slotId), mainThreadExecutor); @@ -1052,7 +1033,7 @@ public void testTimeoutForUnusedTaskManager() throws Exception { assertTrue(idleFuture2.get()); - verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); + assertThat(releasedResourceFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -1109,7 +1090,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception { @Test public void testReportAllocatedSlot() throws Exception { final ResourceID taskManagerId = ResourceID.generate(); - final ResourceActions resourceActions = mock(ResourceActions.class); + final ResourceActions resourceActions = new TestingResourceActionsBuilder().build(); final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway); @@ -1167,7 +1148,7 @@ public void testReportAllocatedSlot() throws Exception { @Test public void testSlotRequestFailure() throws Exception { try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), - new TestingResourceActionsBuilder().createTestingResourceActions())) { + new TestingResourceActionsBuilder().build())) { final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); slotManager.registerSlotRequest(slotRequest); @@ -1222,7 +1203,7 @@ public void testSlotRequestFailure() throws Exception { @Test public void testSlotRequestRemovedIfTMReportAllocation() throws Exception { try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), - new TestingResourceActionsBuilder().createTestingResourceActions())) { + new TestingResourceActionsBuilder().build())) { final JobID jobID = new JobID(); final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); @@ -1293,7 +1274,7 @@ public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti .setNotifyAllocationFailureConsumer( (Tuple3 failureMessage) -> allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1))) - .createTestingResourceActions(); + .build(); try (final SlotManager slotManager = createSlotManager( ResourceManagerId.generate(), @@ -1374,17 +1355,27 @@ private Set extractFailedAllocationsForJob(JobID jobId2, Map slotStatusSet = new HashSet<>(numberSlots); for (int i = 0; i < numberSlots; i++) { - slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN)); + slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), resourceProfile)); } return new SlotReport(slotStatusSet); } @Nonnull - private SlotRequest createSlotRequest(JobID jobId1) { - return new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); + private SlotRequest createSlotRequest(JobID jobId) { + return createSlotRequest(jobId, ResourceProfile.UNKNOWN); + } + + @Nonnull + private SlotRequest createSlotRequest(JobID jobId, ResourceProfile resourceProfile) { + return new SlotRequest(jobId, new AllocationID(), resourceProfile, "foobar1"); } private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { @@ -1398,4 +1389,171 @@ private SlotManager createSlotManager(ResourceManagerId resourceManagerId, Resou return slotManager; } + + /** + * Tests that we only request new resources/containers once we have assigned + * all pending task manager slots. + */ + @Test + public void testRequestNewResources() throws Exception { + final int numberSlots = 2; + final AtomicInteger resourceRequests = new AtomicInteger(0); + final TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction( + convert(ignored -> { + resourceRequests.incrementAndGet(); + return numberSlots; + })) + .build(); + + try (final SlotManager slotManager = createSlotManager( + ResourceManagerId.generate(), + testingResourceActions)) { + + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(1)); + + // the second slot request should not try to allocate a new resource because the + // previous resource was started with 2 slots. + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(1)); + + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(2)); + + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(2)); + } + } + + /** + * Tests that a failing allocation/slot request will return the pending task manager slot. + */ + @Test + public void testFailingAllocationReturnsPendingTaskManagerSlot() throws Exception { + final int numberSlots = 2; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + + final SlotRequest slotRequest = createSlotRequest(jobId); + assertThat(slotManager.registerSlotRequest(slotRequest), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + + slotManager.unregisterSlotRequest(slotRequest.getAllocationId()); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0)); + } + } + + /** + * Tests the completion of pending task manager slots by registering a TaskExecutor. + */ + @Test + public void testPendingTaskManagerSlotCompletion() throws Exception { + final int numberSlots = 3; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + assertThat(slotManager.getNumberRegisteredSlots(), is(0)); + + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1)); + } + } + + private TaskExecutorConnection createTaskExecutorConnection() { + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + return new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); + } + + /** + * Tests that a different slot can fulfill a pending slot request. If the + * pending slot request has a pending task manager slot assigned, it should + * be freed. + */ + @Test + public void testRegistrationOfDifferentSlot() throws Exception { + final int numberSlots = 1; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + final ResourceProfile requestedSlotProfile = new ResourceProfile(1.0, 1); + + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId, requestedSlotProfile)), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + + final int numberOfferedSlots = 1; + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final ResourceProfile offeredSlotProfile = new ResourceProfile(2.0, 2); + final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberOfferedSlots, offeredSlotProfile); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(numberOfferedSlots)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0)); + } + } + + /** + * Tests that only free slots can fulfill/complete a pending task manager slot. + */ + @Test + public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws Exception { + final int numberSlots = 1; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final SlotID slotId = new SlotID(taskExecutorConnection.getResourceID(), 0); + final SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.UNKNOWN, jobId, new AllocationID()); + final SlotReport slotReport = new SlotReport(slotStatus); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(1)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + } + } + + private static FunctionWithException, ResourceManagerException> convert(FunctionWithException function) { + return (ResourceProfile resourceProfile) -> { + final int slots = function.apply(resourceProfile); + + final ArrayList result = new ArrayList<>(slots); + for (int i = 0; i < slots; i++) { + result.add(resourceProfile); + } + + return result; + }; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 8f6317c427f27..66966cc3efff3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -33,13 +33,13 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; -import org.mockito.Mockito; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -47,12 +47,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +/** + * Tests for the slot allocation protocol. + */ public class SlotProtocolTest extends TestLogger { private static final long timeout = 10000L; @@ -87,7 +88,10 @@ public void testSlotsUnavailableRequest() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture resourceProfileFuture = new CompletableFuture<>(); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfileFuture::complete) + .build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -99,14 +103,16 @@ public void testSlotsUnavailableRequest() throws Exception { slotManager.registerSlotRequest(slotRequest); - verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile())); + assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile()))); // slot becomes available - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final SlotID slotID = new SlotID(resourceID, 0); @@ -119,8 +125,7 @@ public void testSlotsUnavailableRequest() throws Exception { slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport); // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000L)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } @@ -137,11 +142,13 @@ public void testSlotAvailableRequest() throws Exception { final ResourceManagerId rmLeaderID = ResourceManagerId.generate(); - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); try (SlotManager slotManager = new SlotManager( scheduledExecutor, @@ -149,7 +156,7 @@ public void testSlotAvailableRequest() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -172,8 +179,7 @@ public void testSlotAvailableRequest() throws Exception { slotManager.registerSlotRequest(slotRequest); // a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java index 8b7c8026f92cb..4c6f14c1c4050 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java @@ -23,9 +23,12 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nonnull; +import java.util.Collection; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -38,29 +41,28 @@ public class TestingResourceActions implements ResourceActions { private final BiConsumer releaseResourceConsumer; @Nonnull - private final Consumer allocateResourceConsumer; + private final FunctionWithException, ResourceManagerException> allocateResourceFunction; @Nonnull private final Consumer> notifyAllocationFailureConsumer; public TestingResourceActions( @Nonnull BiConsumer releaseResourceConsumer, - @Nonnull Consumer allocateResourceConsumer, + @Nonnull FunctionWithException, ResourceManagerException> allocateResourceFunction, @Nonnull Consumer> notifyAllocationFailureConsumer) { this.releaseResourceConsumer = releaseResourceConsumer; - this.allocateResourceConsumer = allocateResourceConsumer; + this.allocateResourceFunction = allocateResourceFunction; this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer; } - @Override public void releaseResource(InstanceID instanceId, Exception cause) { releaseResourceConsumer.accept(instanceId, cause); } @Override - public void allocateResource(ResourceProfile resourceProfile) { - allocateResourceConsumer.accept(resourceProfile); + public Collection allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + return allocateResourceFunction.apply(resourceProfile); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java index 2c1d47e8c88fa..ac7afd4283efb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java @@ -23,7 +23,11 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.function.FunctionWithException; +import java.util.Collection; +import java.util.Collections; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -32,7 +36,7 @@ */ public class TestingResourceActionsBuilder { private BiConsumer releaseResourceConsumer = (ignoredA, ignoredB) -> {}; - private Consumer allocateResourceConsumer = (ignored) -> {}; + private FunctionWithException, ResourceManagerException> allocateResourceFunction = (ignored) -> Collections.singleton(ResourceProfile.UNKNOWN); private Consumer> notifyAllocationFailureConsumer = (ignored) -> {}; public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer releaseResourceConsumer) { @@ -40,8 +44,16 @@ public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer, ResourceManagerException> allocateResourceFunction) { + this.allocateResourceFunction = allocateResourceFunction; + return this; + } + public TestingResourceActionsBuilder setAllocateResourceConsumer(Consumer allocateResourceConsumer) { - this.allocateResourceConsumer = allocateResourceConsumer; + this.allocateResourceFunction = (ResourceProfile resourceProfile) -> { + allocateResourceConsumer.accept(resourceProfile); + return Collections.singleton(ResourceProfile.UNKNOWN); + }; return this; } @@ -50,7 +62,7 @@ public TestingResourceActionsBuilder setNotifyAllocationFailureConsumer(Consumer return this; } - public TestingResourceActions createTestingResourceActions() { - return new TestingResourceActions(releaseResourceConsumer, allocateResourceConsumer, notifyAllocationFailureConsumer); + public TestingResourceActions build() { + return new TestingResourceActions(releaseResourceConsumer, allocateResourceFunction, notifyAllocationFailureConsumer); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 956e40fe61b6e..2a43b8b7f1a60 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -58,6 +58,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -115,6 +116,8 @@ public class YarnResourceManager extends ResourceManager impleme private final Map resourcePriorities = new HashMap<>(); + private final Collection slotsPerWorker; + public YarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, @@ -163,6 +166,8 @@ public YarnResourceManager( this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes(); this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots); + + this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots); } protected AMRMClientAsync createAndStartResourceManagerClient( @@ -283,7 +288,7 @@ protected void internalDeregisterApplication( } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection startNewWorker(ResourceProfile resourceProfile) { // Priority for worker containers - priorities are intra-application //TODO: set priority according to the resource allocated Priority priority = Priority.newInstance(generatePriority(resourceProfile)); @@ -291,6 +296,8 @@ public void startNewWorker(ResourceProfile resourceProfile) { int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores(); Resource capability = Resource.newInstance(mem, vcore); requestYarnContainer(capability, priority); + + return slotsPerWorker; } @Override @@ -334,7 +341,7 @@ public void onContainersCompleted(final List statuses) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); + requestYarnContainerIfRequired(container.getResource(), yarnWorkerNode.getContainer().getPriority()); } // Eagerly close the connection with task manager. closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); @@ -375,7 +382,7 @@ public void onContainersAllocated(List containers) { workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(container.getId()); // and ask for a new one - requestYarnContainer(container.getResource(), container.getPriority()); + requestYarnContainerIfRequired(container.getResource(), container.getPriority()); } } else { // return the excessive containers @@ -446,21 +453,26 @@ private static Tuple2 parseHostPort(String address) { /** * Request new container if pending containers cannot satisfies pending slot requests. */ + private void requestYarnContainerIfRequired(Resource resource, Priority priority) { + int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); + int pendingTaskManagerSlots = numPendingContainerRequests * numberOfTaskSlots; + + if (requiredTaskManagerSlots > pendingTaskManagerSlots) { + requestYarnContainer(resource, priority); + } + } + private void requestYarnContainer(Resource resource, Priority priority) { - int pendingSlotRequests = getNumberPendingSlotRequests(); - int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots; - if (pendingSlotRequests > pendingSlotAllocation) { - resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); + resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); - // make sure we transmit the request fast and receive fast news of granted allocations - resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); + // make sure we transmit the request fast and receive fast news of granted allocations + resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); - numPendingContainerRequests++; + numPendingContainerRequests++; - log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", - resource, - numPendingContainerRequests); - } + log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", + resource, + numPendingContainerRequests); } private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)