Skip to content

Commit

Permalink
[FLINK-21177][runtime] Support limiting max total resources for fine-…
Browse files Browse the repository at this point in the history
…grained resource management.

This closes #15145
  • Loading branch information
xintongsong committed Mar 15, 2021
1 parent 2c30866 commit c775463
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ public class ResourceManagerOptions {
+ "for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take "
+ "effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");

@Documentation.ExcludeFromDocumentation(
"This is only needed by FinGrainedSlotManager, which it still in development.")
public static final ConfigOption<Double> MAX_TOTAL_CPU =
ConfigOptions.key("slotmanager.max-total-resource.cpu")
.doubleType()
.noDefaultValue()
.withDescription(
"Maximum cpu cores the Flink cluster allocates for slots. Resources "
+ "for JobManager and TaskManager framework are excluded. If "
+ "not configured, it will be derived from '"
+ MAX_SLOT_NUM.key()
+ "'.");

@Documentation.ExcludeFromDocumentation(
"This is only needed by FinGrainedSlotManager, which it still in development.")
public static final ConfigOption<MemorySize> MAX_TOTAL_MEM =
ConfigOptions.key("slotmanager.max-total-resource.memory")
.memoryType()
.noDefaultValue()
.withDescription(
"Maximum memory size the Flink cluster allocates for slots. Resources "
+ "for JobManager and TaskManager framework are excluded. If "
+ "not configured, it will be derived from '"
+ MAX_SLOT_NUM.key()
+ "'.");

/**
* The number of redundant task managers. Redundant task managers are extra task managers
* started by Flink, in order to speed up job recovery in case of failures due to task manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
Expand Down Expand Up @@ -88,6 +90,9 @@ public class FineGrainedSlotManager implements SlotManager {
*/
private final boolean waitResultConsumedBeforeRelease;

private final CPUResource maxTotalCpu;
private final MemorySize maxTotalMem;

private boolean sendNotEnoughResourceNotifications = true;

private final Set<JobID> unfulfillableJobs = new HashSet<>();
Expand Down Expand Up @@ -133,6 +138,9 @@ public FineGrainedSlotManager(
this.slotStatusSyncer = Preconditions.checkNotNull(slotStatusSyncer);
this.resourceAllocationStrategy = Preconditions.checkNotNull(resourceAllocationStrategy);

this.maxTotalCpu = Preconditions.checkNotNull(slotManagerConfiguration.getMaxTotalCpu());
this.maxTotalMem = Preconditions.checkNotNull(slotManagerConfiguration.getMaxTotalMem());

resourceManagerId = null;
resourceActions = null;
mainThreadExecutor = null;
Expand Down Expand Up @@ -299,24 +307,41 @@ public boolean registerTaskManager(
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
Optional<PendingTaskManagerId> matchedPendingTaskManagerOptional =
initialSlotReport.hasAllocatedSlot()
? Optional.empty()
: findMatchingPendingTaskManager(
totalResourceProfile, defaultSlotResourceProfile);

if (!matchedPendingTaskManagerOptional.isPresent()
&& isMaxTotalResourceExceededAfterAdding(totalResourceProfile)) {
LOG.info(
"Releasing task manager {}. The max total resource limitation <{}, {}> is reached.",
taskExecutorConnection.getResourceID(),
maxTotalCpu,
maxTotalMem.toHumanReadableString());
resourceActions.releaseResource(
taskExecutorConnection.getInstanceID(),
new FlinkException("The max total resource limitation is reached."));
return false;
}

taskManagerTracker.addTaskManager(
taskExecutorConnection, totalResourceProfile, defaultSlotResourceProfile);

if (initialSlotReport.hasAllocatedSlot()) {
slotStatusSyncer.reportSlotStatus(
taskExecutorConnection.getInstanceID(), initialSlotReport);
} else {
Optional<PendingTaskManagerId> matchedPendingTaskManagerOptional =
findMatchingPendingTaskManager(
totalResourceProfile, defaultSlotResourceProfile);
if (matchedPendingTaskManagerOptional.isPresent()) {
PendingTaskManagerId pendingTaskManager =
matchedPendingTaskManagerOptional.get();
allocateSlotsForRegisteredPendingTaskManager(
pendingTaskManager, taskExecutorConnection.getInstanceID());
taskManagerTracker.removePendingTaskManager(pendingTaskManager);
return true;
}
}

if (matchedPendingTaskManagerOptional.isPresent()) {
PendingTaskManagerId pendingTaskManager = matchedPendingTaskManagerOptional.get();
allocateSlotsForRegisteredPendingTaskManager(
pendingTaskManager, taskExecutorConnection.getInstanceID());
taskManagerTracker.removePendingTaskManager(pendingTaskManager);
return true;
}

checkResourceRequirementsWithDelay();
return true;
}
Expand Down Expand Up @@ -667,6 +692,15 @@ private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
}

private boolean allocateResource(PendingTaskManager pendingTaskManager) {
if (isMaxTotalResourceExceededAfterAdding(pendingTaskManager.getTotalResourceProfile())) {
LOG.info(
"Could not allocate {}. Max total resource limitation <{}, {}> is reached.",
pendingTaskManager,
maxTotalCpu,
maxTotalMem.toHumanReadableString());
return false;
}

if (!resourceActions.allocateResource(
WorkerResourceSpec.fromTotalResourceProfile(
pendingTaskManager.getTotalResourceProfile(),
Expand Down Expand Up @@ -697,4 +731,13 @@ private void checkInit() {
Preconditions.checkNotNull(mainThreadExecutor);
Preconditions.checkNotNull(resourceActions);
}

private boolean isMaxTotalResourceExceededAfterAdding(ResourceProfile newResource) {
final ResourceProfile totalResourceAfterAdding =
newResource
.merge(taskManagerTracker.getRegisteredResource())
.merge(taskManagerTracker.getPendingResource());
return totalResourceAfterAdding.getCpuCores().compareTo(maxTotalCpu) > 0
|| totalResourceAfterAdding.getTotalMemory().compareTo(maxTotalMem) > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand All @@ -46,6 +48,8 @@ public class SlotManagerConfiguration {
private final WorkerResourceSpec defaultWorkerResourceSpec;
private final int numSlotsPerWorker;
private final int maxSlotNum;
private final CPUResource maxTotalCpu;
private final MemorySize maxTotalMem;
private final int redundantTaskManagerNum;

public SlotManagerConfiguration(
Expand All @@ -57,6 +61,8 @@ public SlotManagerConfiguration(
WorkerResourceSpec defaultWorkerResourceSpec,
int numSlotsPerWorker,
int maxSlotNum,
CPUResource maxTotalCpu,
MemorySize maxTotalMem,
int redundantTaskManagerNum) {

this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout);
Expand All @@ -69,6 +75,8 @@ public SlotManagerConfiguration(
Preconditions.checkState(maxSlotNum > 0);
this.numSlotsPerWorker = numSlotsPerWorker;
this.maxSlotNum = maxSlotNum;
this.maxTotalCpu = Preconditions.checkNotNull(maxTotalCpu);
this.maxTotalMem = Preconditions.checkNotNull(maxTotalMem);
Preconditions.checkState(redundantTaskManagerNum >= 0);
this.redundantTaskManagerNum = redundantTaskManagerNum;
}
Expand Down Expand Up @@ -105,6 +113,14 @@ public int getMaxSlotNum() {
return maxSlotNum;
}

public CPUResource getMaxTotalCpu() {
return maxTotalCpu;
}

public MemorySize getMaxTotalMem() {
return maxTotalMem;
}

public int getRedundantTaskManagerNum() {
return redundantTaskManagerNum;
}
Expand Down Expand Up @@ -157,6 +173,8 @@ public static SlotManagerConfiguration fromConfiguration(
defaultWorkerResourceSpec,
numSlotsPerWorker,
maxSlotNum,
getMaxTotalCpu(configuration, defaultWorkerResourceSpec, maxSlotNum),
getMaxTotalMem(configuration, defaultWorkerResourceSpec, maxSlotNum),
redundantTaskManagerNum);
}

Expand All @@ -174,4 +192,40 @@ private static Time getSlotRequestTimeout(final Configuration configuration) {
}
return Time.milliseconds(slotRequestTimeoutMs);
}

private static CPUResource getMaxTotalCpu(
final Configuration configuration,
final WorkerResourceSpec defaultWorkerResourceSpec,
final int maxSlotNum) {
return configuration
.getOptional(ResourceManagerOptions.MAX_TOTAL_CPU)
.map(CPUResource::new)
.orElseGet(
() ->
maxSlotNum == Integer.MAX_VALUE
? new CPUResource(Double.MAX_VALUE)
: (CPUResource)
defaultWorkerResourceSpec
.getCpuCores()
.divide(
defaultWorkerResourceSpec
.getNumSlots())
.multiply(maxSlotNum));
}

private static MemorySize getMaxTotalMem(
final Configuration configuration,
final WorkerResourceSpec defaultWorkerResourceSpec,
final int maxSlotNum) {
return configuration
.getOptional(ResourceManagerOptions.MAX_TOTAL_MEM)
.orElseGet(
() ->
maxSlotNum == Integer.MAX_VALUE
? MemorySize.MAX_VALUE
: defaultWorkerResourceSpec
.getTotalMemSize()
.divide(defaultWorkerResourceSpec.getNumSlots())
.multiply(maxSlotNum));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
Expand Down Expand Up @@ -82,6 +84,8 @@ public void confirmLeadership(UUID leaderId, String leaderAddress) {
WorkerResourceSpec.ZERO,
1,
ResourceManagerOptions.MAX_SLOT_NUM.defaultValue(),
new CPUResource(Double.MAX_VALUE),
MemorySize.MAX_VALUE,
ResourceManagerOptions.REDUNDANT_TASK_MANAGER_NUM.defaultValue()),
ClusterOptions.isDeclarativeResourceManagementEnabled(configuration),
ClusterOptions.isFineGrainedResourceManagementEnabled(configuration));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -149,6 +151,8 @@ public DeclarativeSlotManager build() {
defaultWorkerResourceSpec,
numSlotsPerWorker,
maxSlotNum,
new CPUResource(Double.MAX_VALUE),
MemorySize.MAX_VALUE,
redundantTaskManagerNum);

return new DeclarativeSlotManager(
Expand Down

0 comments on commit c775463

Please sign in to comment.