From 6d5066866dbac1da3b591eb63f290160374add59 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 22 Aug 2018 13:51:38 +0200 Subject: [PATCH 1/5] [FLINK-9455][RM] Add support for multi task slot TaskExecutors Extend ResourceActions interface to return a set of ResourceProfiles describing the set of slots with which the new resource will be started. The SlotManager stores them as PendingTaskManagerSlots which can be assigned to PendingSlotRequests. Only if there are no more TaskManagerSlots and PendingTaskManagerSlots, the SlotManager will request new resources from the ResourceManager. This closes #6734. --- .../MesosResourceManager.java | 43 +-- .../types/ResourceProfile.java | 3 + .../resourcemanager/ResourceManager.java | 20 +- .../StandaloneResourceManager.java | 6 +- .../slotmanager/PendingSlotRequest.java | 23 ++ .../slotmanager/PendingTaskManagerSlot.java | 64 +++++ .../slotmanager/ResourceActions.java | 5 +- .../slotmanager/SlotManager.java | 117 ++++++++- .../slotmanager/TaskManagerSlotId.java | 36 +++ .../taskexecutor/TaskManagerServices.java | 3 +- .../TestingResourceManager.java | 7 +- .../slotmanager/SlotManagerTest.java | 248 ++++++++++++++++-- .../slotmanager/SlotProtocolTest.java | 10 +- .../slotmanager/TestingResourceActions.java | 14 +- .../TestingResourceActionsBuilder.java | 20 +- .../flink/yarn/YarnResourceManager.java | 40 ++- 16 files changed, 556 insertions(+), 103 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index f18d0d8cfc731..e7a5c98791f44 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -24,7 +24,6 @@ import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchCoordinator; -import org.apache.flink.mesos.scheduler.LaunchableTask; import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; import org.apache.flink.mesos.scheduler.TaskMonitor; import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; @@ -79,6 +78,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -124,6 +124,8 @@ public class MesosResourceManager extends ResourceManager slotsPerWorker; + /** Mesos scheduler driver. */ private SchedulerDriver schedulerDriver; @@ -191,6 +193,9 @@ public MesosResourceManager( this.workersInNew = new HashMap<>(8); this.workersInLaunch = new HashMap<>(8); this.workersBeingReturned = new HashMap<>(8); + + final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters(); + this.slotsPerWorker = createSlotsPerWorker(containeredTaskManagerParameters.numSlots()); } protected ActorRef createSelfActor() { @@ -426,7 +431,7 @@ protected void internalDeregisterApplication( } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection startNewWorker(ResourceProfile resourceProfile) { LOG.info("Starting a new worker."); try { // generate new workers into persistent state and launch associated actors @@ -443,9 +448,12 @@ public void startNewWorker(ResourceProfile resourceProfile) { taskMonitor.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), selfActor); // tell the launch coordinator to launch the new tasks - launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList((LaunchableTask) launchable)), selfActor); + launchCoordinator.tell(new LaunchCoordinator.Launch(Collections.singletonList(launchable)), selfActor); + + return slotsPerWorker; } catch (Exception ex) { onFatalError(new ResourceManagerException("Unable to request new workers.", ex)); + return Collections.emptyList(); } } @@ -691,36 +699,13 @@ private CompletableFuture stopActor(@Nullable final ActorRef actorRef, /** * Creates a launchable task for Fenzo to process. */ - private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, ResourceProfile resourceProfile) { - - // create the specific TM parameters from the resource profile and some defaults - MesosTaskManagerParameters params = new MesosTaskManagerParameters( - resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(), - taskManagerParameters.gpus(), - taskManagerParameters.containerType(), - taskManagerParameters.containerImageName(), - new ContaineredTaskManagerParameters( - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB() : resourceProfile.getMemoryInMB(), - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerHeapSizeMB() : resourceProfile.getHeapMemoryInMB(), - ResourceProfile.UNKNOWN.equals(resourceProfile) ? taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB() : resourceProfile.getDirectMemoryInMB(), - 1, - new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())), - taskManagerParameters.containerVolumes(), - taskManagerParameters.dockerParameters(), - taskManagerParameters.dockerForcePullImage(), - taskManagerParameters.constraints(), - taskManagerParameters.command(), - taskManagerParameters.bootstrapCommand(), - taskManagerParameters.getTaskManagerHostname(), - taskManagerParameters.uris() - ); - - LOG.debug("LaunchableMesosWorker parameters: {}", params); + private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) { + LOG.debug("LaunchableMesosWorker parameters: {}", taskManagerParameters); LaunchableMesosWorker launchable = new LaunchableMesosWorker( artifactServer, - params, + taskManagerParameters, taskManagerContainerSpec, taskID, mesosConfig); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index a89b9f92b17c7..5b133e7b624fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -50,6 +50,9 @@ public class ResourceProfile implements Serializable, Comparable startNewWorker(ResourceProfile resourceProfile); /** * Callback when a worker was started. @@ -1051,9 +1053,9 @@ public void releaseResource(InstanceID instanceId, Exception cause) { } @Override - public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + public Collection allocateResource(ResourceProfile resourceProfile) { validateRunsInMainThread(); - startNewWorker(resourceProfile); + return startNewWorker(resourceProfile); } @Override @@ -1176,8 +1178,16 @@ public CompletableFuture retrievePayload(ResourceID resourceID) { // Resource Management // ------------------------------------------------------------------------ - protected int getNumberPendingSlotRequests() { - return slotManager.getNumberPendingSlotRequests(); + protected int getNumberRequiredTaskManagerSlots() { + return slotManager.getNumberPendingTaskManagerSlots(); + } + + // ------------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------------ + + protected static Collection createSlotsPerWorker(int numSlots) { + return Collections.nCopies(numSlots, ResourceProfile.ANY); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 420b89f39f2f1..064c2d361d28e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -32,6 +32,9 @@ import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; + /** * A standalone implementation of the resource manager. Used when the system is started in * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. @@ -74,7 +77,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection startNewWorker(ResourceProfile resourceProfile) { + return Collections.emptyList(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java index 17cf8c7907ecc..a8f212fe6d4f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java @@ -25,10 +25,14 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.concurrent.CompletableFuture; +/** + * Class representing a pending slot request in the {@link SlotManager}. + */ public class PendingSlotRequest { private final SlotRequest slotRequest; @@ -36,11 +40,16 @@ public class PendingSlotRequest { @Nullable private CompletableFuture requestFuture; + @Nullable + private PendingTaskManagerSlot pendingTaskManagerSlot; + /** Timestamp when this pending slot request has been created. */ private final long creationTimestamp; public PendingSlotRequest(SlotRequest slotRequest) { this.slotRequest = Preconditions.checkNotNull(slotRequest); + this.requestFuture = null; + this.pendingTaskManagerSlot = null; creationTimestamp = System.currentTimeMillis(); } @@ -78,4 +87,18 @@ public void setRequestFuture(@Nullable CompletableFuture requestFut public CompletableFuture getRequestFuture() { return requestFuture; } + + @Nullable + public PendingTaskManagerSlot getAssignedPendingTaskManagerSlot() { + return pendingTaskManagerSlot; + } + + public void assignPendingTaskManagerSlot(@Nonnull PendingTaskManagerSlot pendingTaskManagerSlotToAssign) { + Preconditions.checkState(pendingTaskManagerSlot == null); + this.pendingTaskManagerSlot = pendingTaskManagerSlotToAssign; + } + + public void unassignPendingTaskManagerSlot() { + this.pendingTaskManagerSlot = null; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java new file mode 100644 index 0000000000000..ed207e963e33c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingTaskManagerSlot.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Represents a pending task manager slot in the {@link SlotManager}. + */ +public class PendingTaskManagerSlot { + + private final TaskManagerSlotId taskManagerSlotId = TaskManagerSlotId.generate(); + + private final ResourceProfile resourceProfile; + + @Nullable + private PendingSlotRequest pendingSlotRequest; + + public PendingTaskManagerSlot(ResourceProfile resourceProfile) { + this.resourceProfile = resourceProfile; + } + + public TaskManagerSlotId getTaskManagerSlotId() { + return taskManagerSlotId; + } + + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + public void assignPendingSlotRequest(@Nonnull PendingSlotRequest pendingSlotRequestToAssign) { + Preconditions.checkState(pendingSlotRequest == null); + pendingSlotRequest = pendingSlotRequestToAssign; + } + + public void unassignPendingSlotRequest() { + pendingSlotRequest = null; + } + + @Nullable + public PendingSlotRequest getAssignedPendingSlotRequest() { + return pendingSlotRequest; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java index 84e7c4e785d48..adf8f13db57ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import java.util.Collection; + /** * Resource related actions which the {@link SlotManager} can perform. */ @@ -41,9 +43,10 @@ public interface ResourceActions { * Requests to allocate a resource with the given {@link ResourceProfile}. * * @param resourceProfile for the to be allocated resource + * @return Collection of {@link ResourceProfile} describing the allocated slots * @throws ResourceManagerException if the resource cannot be allocated */ - void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException; + Collection allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException; /** * Notifies that an allocation failure has occurred. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index bab56609a1b77..2fa1844e027f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -43,14 +43,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -99,6 +102,8 @@ public class SlotManager implements AutoCloseable { /** Map of pending/unfulfilled slot allocation requests. */ private final HashMap pendingSlotRequests; + private final HashMap pendingSlots; + /** ResourceManager's id. */ private ResourceManagerId resourceManagerId; @@ -130,6 +135,7 @@ public SlotManager( taskManagerRegistrations = new HashMap<>(4); fulfilledSlotRequests = new HashMap<>(16); pendingSlotRequests = new HashMap<>(16); + pendingSlots = new HashMap<>(16); resourceManagerId = null; resourceActions = null; @@ -168,8 +174,13 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) { } } - public int getNumberPendingSlotRequests() { - return pendingSlotRequests.size(); + public int getNumberPendingTaskManagerSlots() { + return pendingSlots.size(); + } + + @VisibleForTesting + int getNumberAssignedPendingTaskManagerSlots() { + return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count(); } // --------------------------------------------------------------------------------------------- @@ -530,14 +541,50 @@ private void registerSlot( removeSlot(slotId); } - TaskManagerSlot slot = new TaskManagerSlot( + final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection); + + final PendingTaskManagerSlot pendingTaskManagerSlot; + + if (allocationId == null) { + pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile); + } else { + pendingTaskManagerSlot = null; + } + + if (pendingTaskManagerSlot == null) { + updateSlot(slotId, allocationId, jobId); + } else { + pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId()); + final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest(); + + if (assignedPendingSlotRequest == null) { + handleFreeSlot(slot); + } else { + assignedPendingSlotRequest.unassignPendingTaskManagerSlot(); + allocateSlot(slot, assignedPendingSlotRequest); + } + } + } + + @Nonnull + private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorConnection taskManagerConnection) { + final TaskManagerSlot slot = new TaskManagerSlot( slotId, resourceProfile, taskManagerConnection); - slots.put(slotId, slot); + return slot; + } - updateSlot(slotId, allocationId, jobId); + @Nullable + private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) { + return pendingTaskManagerSlot; + } + } + + return null; } /** @@ -650,15 +697,56 @@ private void updateSlotState( * @throws ResourceManagerException if the resource manager cannot allocate more resource */ private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { - TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile()); + final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile(); + TaskManagerSlot taskManagerSlot = findMatchingSlot(resourceProfile); if (taskManagerSlot != null) { allocateSlot(taskManagerSlot, pendingSlotRequest); } else { - resourceActions.allocateResource(pendingSlotRequest.getResourceProfile()); + Optional pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile); + + if (!pendingTaskManagerSlotOptional.isPresent()) { + pendingTaskManagerSlotOptional = allocateResource(resourceProfile); + } + + pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot)); + } + } + + private Optional findFreeMatchingPendingTaskManagerSlot(ResourceProfile requiredResourceProfile) { + for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) { + if (pendingTaskManagerSlot.getAssignedPendingSlotRequest() == null && pendingTaskManagerSlot.getResourceProfile().isMatching(requiredResourceProfile)) { + return Optional.of(pendingTaskManagerSlot); + } + } + + return Optional.empty(); + } + + private Optional allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + final Collection requestedSlots = resourceActions.allocateResource(resourceProfile); + + if (requestedSlots.isEmpty()) { + return Optional.empty(); + } else { + final Iterator slotIterator = requestedSlots.iterator(); + final PendingTaskManagerSlot pendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); + pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot); + + while (slotIterator.hasNext()) { + final PendingTaskManagerSlot additionalPendingTaskManagerSlot = new PendingTaskManagerSlot(slotIterator.next()); + pendingSlots.put(additionalPendingTaskManagerSlot.getTaskManagerSlotId(), additionalPendingTaskManagerSlot); + } + + return Optional.of(pendingTaskManagerSlot); } } + private void assignPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest, PendingTaskManagerSlot pendingTaskManagerSlot) { + pendingTaskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); + pendingSlotRequest.assignPendingTaskManagerSlot(pendingTaskManagerSlot); + } + /** * Allocates the given slot for the given slot request. This entails sending a registration * message to the task manager and treating failures. @@ -680,6 +768,8 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest); pendingSlotRequest.setRequestFuture(completableFuture); + returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); if (taskManagerRegistration == null) { @@ -733,6 +823,14 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + private void returnPendingTaskManagerSlotIfAssigned(PendingSlotRequest pendingSlotRequest) { + final PendingTaskManagerSlot pendingTaskManagerSlot = pendingSlotRequest.getAssignedPendingTaskManagerSlot(); + if (pendingTaskManagerSlot != null) { + pendingTaskManagerSlot.unassignPendingSlotRequest(); + pendingSlotRequest.unassignPendingTaskManagerSlot(); + } + } + /** * Handles a free slot. It first tries to find a pending slot request which can be fulfilled. * If there is no such request, then it will add the slot to the set of free slots. @@ -886,6 +984,8 @@ private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exc private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { CompletableFuture request = pendingSlotRequest.getRequestFuture(); + returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest); + if (null != request) { request.cancel(false); } @@ -911,9 +1011,10 @@ private void checkTaskManagerTimeouts() { } // second we trigger the release resource callback which can decide upon the resource release + final FlinkException cause = new FlinkException("TaskExecutor exceeded the idle timeout."); for (InstanceID timedOutTaskManagerId : timedOutTaskManagerIds) { LOG.debug("Release TaskExecutor {} because it exceeded the idle timeout.", timedOutTaskManagerId); - resourceActions.releaseResource(timedOutTaskManagerId, new FlinkException("TaskExecutor exceeded the idle timeout.")); + resourceActions.releaseResource(timedOutTaskManagerId, cause); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java new file mode 100644 index 0000000000000..3084b3e8ed94b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerSlotId.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; +import org.apache.flink.util.AbstractID; + +/** + * Id of {@link TaskManagerSlot} and {@link PendingTaskManagerSlot}. + */ +public class TaskManagerSlotId extends AbstractID { + + private static final long serialVersionUID = -4024240625523472071L; + + private TaskManagerSlotId() {} + + public static TaskManagerSlotId generate() { + return new TaskManagerSlotId(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 0e98e44ab0d8e..c46d800bc9534 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -246,7 +246,7 @@ public static TaskManagerServices fromConfiguration( final List resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots()); for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) { - resourceProfiles.add(new ResourceProfile(1.0, 42)); + resourceProfiles.add(ResourceProfile.ANY); } final TimerService timerService = new TimerService<>( @@ -259,7 +259,6 @@ public static TaskManagerServices fromConfiguration( final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories(); final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java index 0b56231d4a4e2..e8207019a32ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -32,6 +32,9 @@ import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Collections; + /** * Simple {@link ResourceManager} implementation for testing purposes. */ @@ -71,8 +74,8 @@ protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nul } @Override - public void startNewWorker(ResourceProfile resourceProfile) { - // noop + public Collection startNewWorker(ResourceProfile resourceProfile) { + return Collections.emptyList(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 8a7f733a3fa96..854d27cdfafcb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -54,7 +55,9 @@ import javax.annotation.Nonnull; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -67,6 +70,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -202,13 +206,16 @@ public void testSlotRequestWithoutFreeSlots() throws Exception { resourceProfile, "localhost"); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + CompletableFuture allocateResourceFuture = new CompletableFuture<>(); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(allocateResourceFuture::complete) + .build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { slotManager.registerSlotRequest(slotRequest); - verify(resourceManagerActions).allocateResource(eq(resourceProfile)); + assertThat(allocateResourceFuture.get(), is(equalTo(resourceProfile))); } } @@ -357,7 +364,10 @@ public void testFulfillingPendingSlotRequest() throws Exception { resourceProfile, targetAddress); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger numberAllocateResourceCalls = new AtomicInteger(0); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet()) + .build(); // accept an incoming slot request final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); @@ -378,7 +388,7 @@ public void testFulfillingPendingSlotRequest() throws Exception { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); - verify(resourceManagerActions, times(1)).allocateResource(eq(resourceProfile)); + assertThat(numberAllocateResourceCalls.get(), is(1)); slotManager.registerTaskManager( taskExecutorConnection, @@ -444,7 +454,10 @@ public void testFreeSlot() throws Exception { @Test public void testDuplicatePendingSlotRequest() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger numberAllocateResourceFunctionCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> numberAllocateResourceFunctionCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); @@ -458,7 +471,7 @@ public void testDuplicatePendingSlotRequest() throws Exception { // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, times(1)).allocateResource(any(ResourceProfile.class)); + assertThat(numberAllocateResourceFunctionCalls.get(), is(1)); } /** @@ -723,7 +736,10 @@ public void run() { public void testSlotRequestTimeout() throws Exception { final long allocationTimeout = 50L; - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture> failedAllocationFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setNotifyAllocationFailureConsumer(tuple3 -> failedAllocationFuture.complete(Tuple2.of(tuple3.f0, tuple3.f1))) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -743,21 +759,15 @@ public void testSlotRequestTimeout() throws Exception { final AtomicReference atomicException = new AtomicReference<>(null); - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - try { - assertTrue(slotManager.registerSlotRequest(slotRequest)); - } catch (Exception e) { - atomicException.compareAndSet(null, e); - } + mainThreadExecutor.execute(() -> { + try { + assertTrue(slotManager.registerSlotRequest(slotRequest)); + } catch (Exception e) { + atomicException.compareAndSet(null, e); } }); - verify(resourceManagerActions, timeout(100L * allocationTimeout).times(1)).notifyAllocationFailure( - eq(jobId), - eq(allocationId), - any(TimeoutException.class)); + assertThat(failedAllocationFuture.get(), is(equalTo(Tuple2.of(jobId, allocationId)))); if (atomicException.get() != null) { throw atomicException.get(); @@ -851,7 +861,7 @@ public void testTaskManagerSlotRequestTimeoutHandling() throws Exception { public void testSlotReportWhileActiveSlotRequest() throws Exception { final long verifyTimeout = 10000L; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -969,7 +979,10 @@ public void testTimeoutForUnusedTaskManager() throws Exception { final long verifyTimeout = taskManagerTimeout * 10L; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture releasedResourceFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)) + .build(); final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor(); final ResourceID resourceId = ResourceID.generate(); @@ -1052,7 +1065,7 @@ public void testTimeoutForUnusedTaskManager() throws Exception { assertTrue(idleFuture2.get()); - verify(resourceManagerActions, timeout(verifyTimeout).times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); + assertThat(releasedResourceFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -1109,7 +1122,7 @@ public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception { @Test public void testReportAllocatedSlot() throws Exception { final ResourceID taskManagerId = ResourceID.generate(); - final ResourceActions resourceActions = mock(ResourceActions.class); + final ResourceActions resourceActions = new TestingResourceActionsBuilder().build(); final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway); @@ -1167,7 +1180,7 @@ public void testReportAllocatedSlot() throws Exception { @Test public void testSlotRequestFailure() throws Exception { try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), - new TestingResourceActionsBuilder().createTestingResourceActions())) { + new TestingResourceActionsBuilder().build())) { final SlotRequest slotRequest = new SlotRequest(new JobID(), new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); slotManager.registerSlotRequest(slotRequest); @@ -1222,7 +1235,7 @@ public void testSlotRequestFailure() throws Exception { @Test public void testSlotRequestRemovedIfTMReportAllocation() throws Exception { try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), - new TestingResourceActionsBuilder().createTestingResourceActions())) { + new TestingResourceActionsBuilder().build())) { final JobID jobID = new JobID(); final SlotRequest slotRequest1 = new SlotRequest(jobID, new AllocationID(), ResourceProfile.UNKNOWN, "foobar"); @@ -1293,7 +1306,7 @@ public void testNotifyFailedAllocationWhenTaskManagerTerminated() throws Excepti .setNotifyAllocationFailureConsumer( (Tuple3 failureMessage) -> allocationFailures.offer(Tuple2.of(failureMessage.f0, failureMessage.f1))) - .createTestingResourceActions(); + .build(); try (final SlotManager slotManager = createSlotManager( ResourceManagerId.generate(), @@ -1374,17 +1387,27 @@ private Set extractFailedAllocationsForJob(JobID jobId2, Map slotStatusSet = new HashSet<>(numberSlots); for (int i = 0; i < numberSlots; i++) { - slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), ResourceProfile.UNKNOWN)); + slotStatusSet.add(new SlotStatus(new SlotID(taskExecutorResourceId, i), resourceProfile)); } return new SlotReport(slotStatusSet); } @Nonnull - private SlotRequest createSlotRequest(JobID jobId1) { - return new SlotRequest(jobId1, new AllocationID(), ResourceProfile.UNKNOWN, "foobar1"); + private SlotRequest createSlotRequest(JobID jobId) { + return createSlotRequest(jobId, ResourceProfile.UNKNOWN); + } + + @Nonnull + private SlotRequest createSlotRequest(JobID jobId, ResourceProfile resourceProfile) { + return new SlotRequest(jobId, new AllocationID(), resourceProfile, "foobar1"); } private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceActions resourceManagerActions) { @@ -1398,4 +1421,171 @@ private SlotManager createSlotManager(ResourceManagerId resourceManagerId, Resou return slotManager; } + + /** + * Tests that we only request new resources/containers once we have assigned + * all pending task manager slots. + */ + @Test + public void testRequestNewResources() throws Exception { + final int numberSlots = 2; + final AtomicInteger resourceRequests = new AtomicInteger(0); + final TestingResourceActions testingResourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction( + convert(ignored -> { + resourceRequests.incrementAndGet(); + return numberSlots; + })) + .build(); + + try (final SlotManager slotManager = createSlotManager( + ResourceManagerId.generate(), + testingResourceActions)) { + + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(1)); + + // the second slot request should not try to allocate a new resource because the + // previous resource was started with 2 slots. + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(1)); + + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(2)); + + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + assertThat(resourceRequests.get(), is(2)); + } + } + + /** + * Tests that a failing allocation/slot request will return the pending task manager slot. + */ + @Test + public void testFailingAllocationReturnsPendingTaskManagerSlot() throws Exception { + final int numberSlots = 2; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + + final SlotRequest slotRequest = createSlotRequest(jobId); + assertThat(slotManager.registerSlotRequest(slotRequest), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + + slotManager.unregisterSlotRequest(slotRequest.getAllocationId()); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0)); + } + } + + /** + * Tests the completion of pending task manager slots by registering a TaskExecutor. + */ + @Test + public void testPendingTaskManagerSlotCompletion() throws Exception { + final int numberSlots = 3; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + assertThat(slotManager.getNumberRegisteredSlots(), is(0)); + + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberSlots - 1); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(numberSlots - 1)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(1)); + } + } + + private TaskExecutorConnection createTaskExecutorConnection() { + final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); + return new TaskExecutorConnection(ResourceID.generate(), taskExecutorGateway); + } + + /** + * Tests that a different slot can fulfill a pending slot request. If the + * pending slot request has a pending task manager slot assigned, it should + * be freed. + */ + @Test + public void testRegistrationOfDifferentSlot() throws Exception { + final int numberSlots = 1; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + final ResourceProfile requestedSlotProfile = new ResourceProfile(1.0, 1); + + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId, requestedSlotProfile)), is(true)); + + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + + final int numberOfferedSlots = 1; + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final ResourceProfile offeredSlotProfile = new ResourceProfile(2.0, 2); + final SlotReport slotReport = createSlotReport(taskExecutorConnection.getResourceID(), numberOfferedSlots, offeredSlotProfile); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(numberOfferedSlots)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(0)); + } + } + + /** + * Tests that only free slots can fulfill/complete a pending task manager slot. + */ + @Test + public void testOnlyFreeSlotsCanFulfillPendingTaskManagerSlot() throws Exception { + final int numberSlots = 1; + final TestingResourceActions resourceActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(convert(value -> numberSlots)) + .build(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), resourceActions)) { + final JobID jobId = new JobID(); + assertThat(slotManager.registerSlotRequest(createSlotRequest(jobId)), is(true)); + + final TaskExecutorConnection taskExecutorConnection = createTaskExecutorConnection(); + final SlotID slotId = new SlotID(taskExecutorConnection.getResourceID(), 0); + final SlotStatus slotStatus = new SlotStatus(slotId, ResourceProfile.UNKNOWN, jobId, new AllocationID()); + final SlotReport slotReport = new SlotReport(slotStatus); + + slotManager.registerTaskManager(taskExecutorConnection, slotReport); + + assertThat(slotManager.getNumberRegisteredSlots(), is(1)); + assertThat(slotManager.getNumberPendingTaskManagerSlots(), is(numberSlots)); + assertThat(slotManager.getNumberAssignedPendingTaskManagerSlots(), is(1)); + } + } + + private static FunctionWithException, ResourceManagerException> convert(FunctionWithException function) { + return (ResourceProfile resourceProfile) -> { + final int slots = function.apply(resourceProfile); + + final ArrayList result = new ArrayList<>(slots); + for (int i = 0; i < slots; i++) { + result.add(resourceProfile); + } + + return result; + }; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 8f6317c427f27..51e6b0b89d8bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -47,6 +47,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -87,7 +90,10 @@ public void testSlotsUnavailableRequest() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture resourceProfileFuture = new CompletableFuture<>(); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> resourceProfileFuture.complete(resourceProfile)) + .build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -99,7 +105,7 @@ public void testSlotsUnavailableRequest() throws Exception { slotManager.registerSlotRequest(slotRequest); - verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile())); + assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile()))); // slot becomes available TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java index 8b7c8026f92cb..4c6f14c1c4050 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActions.java @@ -23,9 +23,12 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nonnull; +import java.util.Collection; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -38,29 +41,28 @@ public class TestingResourceActions implements ResourceActions { private final BiConsumer releaseResourceConsumer; @Nonnull - private final Consumer allocateResourceConsumer; + private final FunctionWithException, ResourceManagerException> allocateResourceFunction; @Nonnull private final Consumer> notifyAllocationFailureConsumer; public TestingResourceActions( @Nonnull BiConsumer releaseResourceConsumer, - @Nonnull Consumer allocateResourceConsumer, + @Nonnull FunctionWithException, ResourceManagerException> allocateResourceFunction, @Nonnull Consumer> notifyAllocationFailureConsumer) { this.releaseResourceConsumer = releaseResourceConsumer; - this.allocateResourceConsumer = allocateResourceConsumer; + this.allocateResourceFunction = allocateResourceFunction; this.notifyAllocationFailureConsumer = notifyAllocationFailureConsumer; } - @Override public void releaseResource(InstanceID instanceId, Exception cause) { releaseResourceConsumer.accept(instanceId, cause); } @Override - public void allocateResource(ResourceProfile resourceProfile) { - allocateResourceConsumer.accept(resourceProfile); + public Collection allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException { + return allocateResourceFunction.apply(resourceProfile); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java index 2c1d47e8c88fa..ac7afd4283efb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingResourceActionsBuilder.java @@ -23,7 +23,11 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.util.function.FunctionWithException; +import java.util.Collection; +import java.util.Collections; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -32,7 +36,7 @@ */ public class TestingResourceActionsBuilder { private BiConsumer releaseResourceConsumer = (ignoredA, ignoredB) -> {}; - private Consumer allocateResourceConsumer = (ignored) -> {}; + private FunctionWithException, ResourceManagerException> allocateResourceFunction = (ignored) -> Collections.singleton(ResourceProfile.UNKNOWN); private Consumer> notifyAllocationFailureConsumer = (ignored) -> {}; public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer releaseResourceConsumer) { @@ -40,8 +44,16 @@ public TestingResourceActionsBuilder setReleaseResourceConsumer(BiConsumer, ResourceManagerException> allocateResourceFunction) { + this.allocateResourceFunction = allocateResourceFunction; + return this; + } + public TestingResourceActionsBuilder setAllocateResourceConsumer(Consumer allocateResourceConsumer) { - this.allocateResourceConsumer = allocateResourceConsumer; + this.allocateResourceFunction = (ResourceProfile resourceProfile) -> { + allocateResourceConsumer.accept(resourceProfile); + return Collections.singleton(ResourceProfile.UNKNOWN); + }; return this; } @@ -50,7 +62,7 @@ public TestingResourceActionsBuilder setNotifyAllocationFailureConsumer(Consumer return this; } - public TestingResourceActions createTestingResourceActions() { - return new TestingResourceActions(releaseResourceConsumer, allocateResourceConsumer, notifyAllocationFailureConsumer); + public TestingResourceActions build() { + return new TestingResourceActions(releaseResourceConsumer, allocateResourceFunction, notifyAllocationFailureConsumer); } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 956e40fe61b6e..2a43b8b7f1a60 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -58,6 +58,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -115,6 +116,8 @@ public class YarnResourceManager extends ResourceManager impleme private final Map resourcePriorities = new HashMap<>(); + private final Collection slotsPerWorker; + public YarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, @@ -163,6 +166,8 @@ public YarnResourceManager( this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes(); this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots); + + this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots); } protected AMRMClientAsync createAndStartResourceManagerClient( @@ -283,7 +288,7 @@ protected void internalDeregisterApplication( } @Override - public void startNewWorker(ResourceProfile resourceProfile) { + public Collection startNewWorker(ResourceProfile resourceProfile) { // Priority for worker containers - priorities are intra-application //TODO: set priority according to the resource allocated Priority priority = Priority.newInstance(generatePriority(resourceProfile)); @@ -291,6 +296,8 @@ public void startNewWorker(ResourceProfile resourceProfile) { int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores(); Resource capability = Resource.newInstance(mem, vcore); requestYarnContainer(capability, priority); + + return slotsPerWorker; } @Override @@ -334,7 +341,7 @@ public void onContainersCompleted(final List statuses) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one final Container container = yarnWorkerNode.getContainer(); - requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority()); + requestYarnContainerIfRequired(container.getResource(), yarnWorkerNode.getContainer().getPriority()); } // Eagerly close the connection with task manager. closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); @@ -375,7 +382,7 @@ public void onContainersAllocated(List containers) { workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(container.getId()); // and ask for a new one - requestYarnContainer(container.getResource(), container.getPriority()); + requestYarnContainerIfRequired(container.getResource(), container.getPriority()); } } else { // return the excessive containers @@ -446,21 +453,26 @@ private static Tuple2 parseHostPort(String address) { /** * Request new container if pending containers cannot satisfies pending slot requests. */ + private void requestYarnContainerIfRequired(Resource resource, Priority priority) { + int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); + int pendingTaskManagerSlots = numPendingContainerRequests * numberOfTaskSlots; + + if (requiredTaskManagerSlots > pendingTaskManagerSlots) { + requestYarnContainer(resource, priority); + } + } + private void requestYarnContainer(Resource resource, Priority priority) { - int pendingSlotRequests = getNumberPendingSlotRequests(); - int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots; - if (pendingSlotRequests > pendingSlotAllocation) { - resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); + resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); - // make sure we transmit the request fast and receive fast news of granted allocations - resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); + // make sure we transmit the request fast and receive fast news of granted allocations + resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); - numPendingContainerRequests++; + numPendingContainerRequests++; - log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", - resource, - numPendingContainerRequests); - } + log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.", + resource, + numPendingContainerRequests); } private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) From 43a96bb4f56f47f5ba43a94e9d5820922d81505d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 21 Sep 2018 15:53:52 +0200 Subject: [PATCH 2/5] [hotfix] Cancel actual pending slot request in SlotManager#updateSlotState --- .../runtime/resourcemanager/slotmanager/SlotManager.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 2fa1844e027f3..2ef2b2fefcc36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -642,7 +642,11 @@ private void updateSlotState( slot.updateAllocation(allocationId, jobId); // remove the pending request if any as it has been assigned - pendingSlotRequests.remove(allocationId); + final PendingSlotRequest actualPendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (actualPendingSlotRequest != null) { + cancelPendingSlotRequest(actualPendingSlotRequest); + } // this will try to find a new slot for the request rejectPendingSlotRequest( From 844a0b6ae073e07d4a0e3e4a0b68ac18504b84b4 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 21 Sep 2018 17:16:32 +0200 Subject: [PATCH 3/5] [hotfix] Remove mocking from SlotManagerTest --- .../slotmanager/SlotManagerTest.java | 160 +++++++----------- 1 file changed, 64 insertions(+), 96 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 854d27cdfafcb..33a696a1d242e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -88,9 +88,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -107,9 +105,9 @@ public class SlotManagerTest extends TestLogger { @Test public void testTaskManagerRegistration() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -139,14 +137,12 @@ public void testTaskManagerUnregistration() throws Exception { final ResourceActions resourceManagerActions = mock(ResourceActions.class); final JobID jobId = new JobID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + assertThat(tuple5.f4, is(equalTo(resourceManagerId))); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -232,8 +228,11 @@ public void testSlotRequestWithResourceAllocationFailure() throws Exception { resourceProfile, "localhost"); - ResourceActions resourceManagerActions = mock(ResourceActions.class); - doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class)); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(value -> { + throw new ResourceManagerException("Test exception"); + }) + .build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { @@ -264,19 +263,17 @@ public void testSlotRequestWithFreeSlot() throws Exception { resourceProfile, targetAddress); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - + final CompletableFuture> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -289,7 +286,7 @@ public void testSlotRequestWithFreeSlot() throws Exception { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -309,14 +306,9 @@ public void testUnregisterPendingSlotRequest() throws Exception { final SlotID slotId = new SlotID(resourceID, 0); final AllocationID allocationId = new AllocationID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture<>()) + .createTestingTaskExecutorGateway(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); @@ -369,15 +361,14 @@ public void testFulfillingPendingSlotRequest() throws Exception { .setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet()) .build(); + final CompletableFuture> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -394,7 +385,7 @@ public void testFulfillingPendingSlotRequest() throws Exception { taskExecutorConnection, slotReport); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -510,21 +501,17 @@ public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception { @Test public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); @@ -547,7 +534,7 @@ public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -557,21 +544,17 @@ public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Ex @Test public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -601,7 +584,7 @@ public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exc // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -637,7 +620,7 @@ public void testReceivingUnknownSlotReport() throws Exception { @Test public void testUpdateSlotReport() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -691,13 +674,16 @@ public void testUpdateSlotReport() throws Exception { */ @Test public void testTaskManagerTimeout() throws Exception { - final long tmTimeout = 500L; + final long tmTimeout = 10L; - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture releaseFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID)) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ResourceID resourceID = ResourceID.generate(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); final SlotID slotId = new SlotID(resourceID, 0); @@ -715,15 +701,9 @@ public void testTaskManagerTimeout() throws Exception { slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions); - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - slotManager.registerTaskManager(taskManagerConnection, slotReport); - } - }); + mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport)); - verify(resourceManagerActions, timeout(100L * tmTimeout).times(1)) - .releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); + assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -976,13 +956,12 @@ public void testSlotReportWhileActiveSlotRequest() throws Exception { @Test public void testTimeoutForUnusedTaskManager() throws Exception { final long taskManagerTimeout = 50L; - final long verifyTimeout = taskManagerTimeout * 10L; - final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final CompletableFuture releasedResourceFuture = new CompletableFuture<>(); final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() .setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)) .build(); + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor(); final ResourceID resourceId = ResourceID.generate(); @@ -992,14 +971,13 @@ public void testTimeoutForUnusedTaskManager() throws Exception { final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final CompletableFuture requestedSlotFuture = new CompletableFuture<>(); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestedSlotFuture.complete(tuple5.f0); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -1028,17 +1006,9 @@ public void testTimeoutForUnusedTaskManager() throws Exception { } }, mainThreadExecutor) - .thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); - - ArgumentCaptor slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class); + .thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); - verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot( - slotIdArgumentCaptor.capture(), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class)); + final SlotID slotId = requestedSlotFuture.get(); CompletableFuture idleFuture = CompletableFuture.supplyAsync( () -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()), @@ -1047,8 +1017,6 @@ public void testTimeoutForUnusedTaskManager() throws Exception { // check that the TaskManager is not idle assertFalse(idleFuture.get()); - final SlotID slotId = slotIdArgumentCaptor.getValue(); - CompletableFuture slotFuture = CompletableFuture.supplyAsync( () -> slotManager.getSlot(slotId), mainThreadExecutor); From 6ba4563066c5adfc5f737b93b37715b7bc080298 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 21 Sep 2018 17:24:49 +0200 Subject: [PATCH 4/5] [hotfix] Remove mocking from SlotProtocolTest --- .../slotmanager/SlotProtocolTest.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 51e6b0b89d8bc..66966cc3efff3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -33,13 +33,13 @@ import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; -import org.mockito.Mockito; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -50,12 +50,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; +/** + * Tests for the slot allocation protocol. + */ public class SlotProtocolTest extends TestLogger { private static final long timeout = 10000L; @@ -92,7 +90,7 @@ public void testSlotsUnavailableRequest() throws Exception { final CompletableFuture resourceProfileFuture = new CompletableFuture<>(); ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() - .setAllocateResourceConsumer(resourceProfile -> resourceProfileFuture.complete(resourceProfile)) + .setAllocateResourceConsumer(resourceProfileFuture::complete) .build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -108,11 +106,13 @@ public void testSlotsUnavailableRequest() throws Exception { assertThat(resourceProfileFuture.get(), is(equalTo(slotRequest.getResourceProfile()))); // slot becomes available - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final SlotID slotID = new SlotID(resourceID, 0); @@ -125,8 +125,7 @@ public void testSlotsUnavailableRequest() throws Exception { slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport); // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000L)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } @@ -143,11 +142,13 @@ public void testSlotAvailableRequest() throws Exception { final ResourceManagerId rmLeaderID = ResourceManagerId.generate(); - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when( - taskExecutorGateway - .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class))) - .thenReturn(mock(CompletableFuture.class)); + final CompletableFuture> requestFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple3.of(tuple5.f0, tuple5.f1, tuple5.f2)); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); try (SlotManager slotManager = new SlotManager( scheduledExecutor, @@ -155,7 +156,7 @@ public void testSlotAvailableRequest() throws Exception { TestingUtils.infiniteTime(), TestingUtils.infiniteTime())) { - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); @@ -178,8 +179,7 @@ public void testSlotAvailableRequest() throws Exception { slotManager.registerSlotRequest(slotRequest); // a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple3.of(slotID, jobID, allocationID)))); } } } From 1ac733678ea5704b5fc13896ebb93897a974022d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Sat, 22 Sep 2018 14:13:42 +0200 Subject: [PATCH 5/5] [hotfix] Start MesosWorkers with default ContaineredTaskManagerConfiguration --- .../mesos/runtime/clusterframework/MesosResourceManager.java | 4 ++-- .../runtime/clusterframework/MesosResourceManagerTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java index e7a5c98791f44..d826773520988 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java @@ -357,7 +357,7 @@ private void recoverWorkers(final List tasksFromPreviou switch(worker.state()) { case Launched: workersInLaunch.put(extractResourceID(worker.taskID()), worker); - final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), worker.profile()); + final LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get())); break; case Released: @@ -439,7 +439,7 @@ public Collection startNewWorker(ResourceProfile resourceProfil workerStore.putWorker(worker); workersInNew.put(extractResourceID(worker.taskID()), worker); - LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID(), resourceProfile); + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs()); diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index e21f0fc33748c..5163724ed6dd8 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -242,7 +242,7 @@ static class Context implements AutoCloseable { TestingMesosResourceManager resourceManager; // domain objects for test purposes - final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 1); + final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN; Protos.FrameworkID framework1 = Protos.FrameworkID.newBuilder().setValue("framework1").build(); public Protos.SlaveID slave1 = Protos.SlaveID.newBuilder().setValue("slave1").build();