Skip to content

Commit

Permalink
[FLINK-20864][runtime] Introduce the DEFAULT resource profile for the…
Browse files Browse the repository at this point in the history
… default slot size and fix the relevant matching logic
  • Loading branch information
KarmaGYZ committed Jan 12, 2021
1 parent 460e179 commit 0f42dff
Show file tree
Hide file tree
Showing 20 changed files with 344 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ResourceProfile getAvailableBudget() {

public boolean reserve(final ResourceProfile reservation) {
checkResourceProfileNotNullOrUnknown(reservation);
if (!availableBudget.isMatching(reservation)) {
if (!availableBudget.isBiggerThan(reservation)) {
return false;
}

Expand All @@ -61,7 +61,7 @@ public boolean reserve(final ResourceProfile reservation) {
public boolean release(final ResourceProfile reservation) {
checkResourceProfileNotNullOrUnknown(reservation);
ResourceProfile newAvailableBudget = availableBudget.merge(reservation);
if (!totalBudget.isMatching(newAvailableBudget)) {
if (!totalBudget.isBiggerThan(newAvailableBudget)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Describe the immutable resource profile of the slot, either when requiring or offering it. The
Expand Down Expand Up @@ -67,6 +68,20 @@ public class ResourceProfile implements Serializable {
*/
public static final ResourceProfile UNKNOWN = new ResourceProfile();

/**
* A ResourceProfile that indicates an default resource capacity. This is only be used for
* describing resource capacity of a default task manager slot. This profile should be only used
* in matching logic.
*/
public static final ResourceProfile DEFAULT =
newBuilder()
.setCpuCores(Double.MAX_VALUE)
.setTaskHeapMemory(MemorySize.ZERO)
.setTaskOffHeapMemory(MemorySize.MAX_VALUE)
.setManagedMemory(MemorySize.ZERO)
.setNetworkMemory(MemorySize.MAX_VALUE)
.build();

/**
* A ResourceProfile that indicates infinite resource that matches any resource requirement, for
* testability purpose only.
Expand Down Expand Up @@ -166,7 +181,7 @@ private ResourceProfile() {
* @return The cpu cores, 1.0 means a full cpu thread
*/
public Resource getCpuCores() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return cpuCores;
}

Expand All @@ -176,7 +191,7 @@ public Resource getCpuCores() {
* @return The task heap memory
*/
public MemorySize getTaskHeapMemory() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return taskHeapMemory;
}

Expand All @@ -186,7 +201,7 @@ public MemorySize getTaskHeapMemory() {
* @return The task off-heap memory
*/
public MemorySize getTaskOffHeapMemory() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return taskOffHeapMemory;
}

Expand All @@ -196,7 +211,7 @@ public MemorySize getTaskOffHeapMemory() {
* @return The managed memory
*/
public MemorySize getManagedMemory() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return managedMemory;
}

Expand All @@ -206,7 +221,7 @@ public MemorySize getManagedMemory() {
* @return The network memory
*/
public MemorySize getNetworkMemory() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return networkMemory;
}

Expand All @@ -216,7 +231,7 @@ public MemorySize getNetworkMemory() {
* @return The total memory
*/
public MemorySize getTotalMemory() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return getOperatorsMemory().add(networkMemory);
}

Expand All @@ -226,7 +241,7 @@ public MemorySize getTotalMemory() {
* @return The operator memory
*/
public MemorySize getOperatorsMemory() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return taskHeapMemory.add(taskOffHeapMemory).add(managedMemory);
}

Expand All @@ -236,12 +251,12 @@ public MemorySize getOperatorsMemory() {
* @return The extended resources
*/
public Map<String, Resource> getExtendedResources() {
throwUnsupportedOperationExecptionIfUnknown();
throwUnsupportedOperationExecptionIfUnknownOrDefault();
return Collections.unmodifiableMap(extendedResources);
}

private void throwUnsupportedOperationExecptionIfUnknown() {
if (this.equals(UNKNOWN)) {
private void throwUnsupportedOperationExecptionIfUnknownOrDefault() {
if (this.equals(UNKNOWN) || this.equals(DEFAULT)) {
throw new UnsupportedOperationException();
}
}
Expand All @@ -263,21 +278,48 @@ public boolean isMatching(final ResourceProfile required) {
return true;
}

if (this.equals(DEFAULT) && required.equals(UNKNOWN)) {
return true;
}

return false;
}

/**
* Check whether this resource profile is bigger than the given resource profile.
*
* @param other the other resource profile
* @return true if this resource profile is bigger, otherwise false
*/
public boolean isBiggerThan(final ResourceProfile other) {
checkNotNull(other, "Cannot compare null resources");
checkState(
!this.equals(DEFAULT) && !other.equals(DEFAULT),
"Cannot compare DEFAULT resources");

if (this.equals(ANY)) {
return true;
}

if (this.equals(other)) {
return true;
}

if (this.equals(UNKNOWN)) {
return false;
}

if (required.equals(UNKNOWN)) {
if (other.equals(UNKNOWN)) {
return true;
}

if (cpuCores.getValue().compareTo(required.cpuCores.getValue()) >= 0
&& taskHeapMemory.compareTo(required.taskHeapMemory) >= 0
&& taskOffHeapMemory.compareTo(required.taskOffHeapMemory) >= 0
&& managedMemory.compareTo(required.managedMemory) >= 0
&& networkMemory.compareTo(required.networkMemory) >= 0) {
if (cpuCores.getValue().compareTo(other.cpuCores.getValue()) >= 0
&& taskHeapMemory.compareTo(other.taskHeapMemory) >= 0
&& taskOffHeapMemory.compareTo(other.taskOffHeapMemory) >= 0
&& managedMemory.compareTo(other.managedMemory) >= 0
&& networkMemory.compareTo(other.networkMemory) >= 0) {

for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
for (Map.Entry<String, Resource> resource : other.extendedResources.entrySet()) {
if (!extendedResources.containsKey(resource.getKey())
|| extendedResources
.get(resource.getKey())
Expand Down Expand Up @@ -330,6 +372,8 @@ public boolean equals(Object obj) {
@Nonnull
public ResourceProfile merge(final ResourceProfile other) {
checkNotNull(other, "Cannot merge with null resources");
checkState(
!this.equals(DEFAULT) && !other.equals(DEFAULT), "Cannot merge DEFAULT resources.");

if (equals(ANY) || other.equals(ANY)) {
return ANY;
Expand Down Expand Up @@ -366,6 +410,9 @@ public ResourceProfile merge(final ResourceProfile other) {
*/
public ResourceProfile subtract(final ResourceProfile other) {
checkNotNull(other, "Cannot subtract with null resources");
checkState(
!this.equals(DEFAULT) && !other.equals(DEFAULT),
"Cannot subtract DEFAULT resources.");

if (equals(ANY) || other.equals(ANY)) {
return ANY;
Expand All @@ -376,7 +423,8 @@ public ResourceProfile subtract(final ResourceProfile other) {
}

checkArgument(
isMatching(other), "Try to subtract an unmatched resource profile from this one.");
isBiggerThan(other),
"Try to subtract an unmatched resource profile from this one.");

Map<String, Resource> resultExtendedResource = new HashMap<>(extendedResources);

Expand Down Expand Up @@ -404,6 +452,7 @@ public ResourceProfile subtract(final ResourceProfile other) {
@Nonnull
public ResourceProfile multiply(final int multiplier) {
checkArgument(multiplier >= 0, "multiplier must be >= 0");
checkArgument(!this.equals(DEFAULT), "Cannot multiply DEFAULT resources.");
if (equals(ANY)) {
return ANY;
}
Expand Down Expand Up @@ -436,6 +485,10 @@ public String toString() {
return "ResourceProfile{ANY}";
}

if (this.equals(DEFAULT)) {
return "ResourceProfile{DEFAULT}";
}

final StringBuilder extendedResourceStr = new StringBuilder(extendedResources.size() * 10);
for (Map.Entry<String, Resource> resource : extendedResources.entrySet()) {
extendedResourceStr
Expand Down Expand Up @@ -483,6 +536,10 @@ private Object readResolve() {
return ANY;
}

if (this.equals(DEFAULT)) {
return DEFAULT;
}

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,24 +276,8 @@ public PhysicalSlot reserveFreeSlot(
allocationId,
allocatedSlot.getResourceProfile(),
requiredSlotProfile);

ResourceProfile previouslyMatchedResourceProfile =
Preconditions.checkNotNull(slotToRequirementProfileMappings.get(allocationId));

if (!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) {
// slots can be reserved for a requirement that is not in line with the mapping we
// computed when the slot was
// offered, so we have to update the mapping adjust the requirements accordingly to
// ensure we still request enough slots to
// be able to fulfill the total requirements
LOG.debug(
"Adjusting requirements because a slot was reserved for a different requirement than initially assumed. Slot={} assumedRequirement={} actualRequirement={}",
allocationId,
previouslyMatchedResourceProfile,
requiredSlotProfile);
updateSlotToRequirementProfileMapping(allocationId, requiredSlotProfile);
adjustRequirements(previouslyMatchedResourceProfile, requiredSlotProfile);
}
Preconditions.checkState(
requiredSlotProfile.equals(slotToRequirementProfileMappings.get(allocationId)));

return allocatedSlot;
}
Expand All @@ -319,30 +303,6 @@ public ResourceCounter freeReservedSlot(
return previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty);
}

private void updateSlotToRequirementProfileMapping(
AllocationID allocationId, ResourceProfile matchedResourceProfile) {
final ResourceProfile oldResourceProfile =
Preconditions.checkNotNull(
slotToRequirementProfileMappings.put(allocationId, matchedResourceProfile),
"Expected slot profile matching to be non-empty.");

fulfilledResourceRequirements =
fulfilledResourceRequirements.add(matchedResourceProfile, 1);
fulfilledResourceRequirements =
fulfilledResourceRequirements.subtract(oldResourceProfile, 1);
}

private void adjustRequirements(
ResourceProfile oldResourceProfile, ResourceProfile newResourceProfile) {
// slots can be reserved for a requirement that is not in line with the mapping we computed
// when the slot was
// offered, so we have to adjust the requirements accordingly to ensure we still request
// enough slots to
// be able to fulfill the total requirements
decreaseResourceRequirementsBy(ResourceCounter.withResource(newResourceProfile, 1));
increaseResourceRequirementsBy(ResourceCounter.withResource(oldResourceProfile, 1));
}

@Override
public void registerNewSlotsListener(NewSlotsListener newSlotsListener) {
Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ private Optional<PendingTaskManagerSlot> allocateResource(
return Optional.empty();
}

if (!defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
if (!defaultSlotResourceProfile.isBiggerThan(requestedSlotResourceProfile)) {
// requested resource profile is unfulfillable
return Optional.empty();
}
Expand All @@ -1067,7 +1067,7 @@ private Optional<PendingTaskManagerSlot> allocateResource(

PendingTaskManagerSlot pendingTaskManagerSlot = null;
for (int i = 0; i < numSlotsPerWorker; ++i) {
pendingTaskManagerSlot = new PendingTaskManagerSlot(defaultSlotResourceProfile);
pendingTaskManagerSlot = new PendingTaskManagerSlot(ResourceProfile.DEFAULT);
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public Optional<ResourceRequirement> allocateWorker(
return Optional.empty();
}

if (!defaultSlotResourceProfile.isMatching(requestedSlotResourceProfile)) {
if (!defaultSlotResourceProfile.isBiggerThan(requestedSlotResourceProfile)) {
// requested resource profile is unfulfillable
return Optional.empty();
}
Expand All @@ -278,7 +278,7 @@ public Optional<ResourceRequirement> allocateWorker(

for (int i = 0; i < numSlotsPerWorker; ++i) {
PendingTaskManagerSlot pendingTaskManagerSlot =
new PendingTaskManagerSlot(defaultSlotResourceProfile);
new PendingTaskManagerSlot(ResourceProfile.DEFAULT);
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), pendingTaskManagerSlot);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public Builder withCoLocationConstraint(final CoLocationConstraint coLocationCon

public ExecutionVertexSchedulingRequirements build() {
checkState(
physicalSlotResourceProfile.isMatching(taskResourceProfile),
physicalSlotResourceProfile.isBiggerThan(taskResourceProfile),
"The physical slot resources must fulfill the task slot requirements");

return new ExecutionVertexSchedulingRequirements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,17 @@ public class TaskSlot<T extends TaskSlotPayload> implements AutoCloseableAsync {
/** {@link Executor} for background actions, e.g. verify all managed memory released. */
private final Executor asyncExecutor;

/** Whether the task slot is allocated with default resource profile. */
private final boolean isDefault;

public TaskSlot(
final int index,
final ResourceProfile resourceProfile,
final int memoryPageSize,
final JobID jobId,
final AllocationID allocationId,
final Executor asyncExecutor) {
final Executor asyncExecutor,
final boolean isDefault) {

this.index = index;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
Expand All @@ -107,6 +111,7 @@ public TaskSlot(

this.jobId = jobId;
this.allocationId = allocationId;
this.isDefault = isDefault;

this.memoryManager = createMemoryManager(resourceProfile, memoryPageSize);

Expand All @@ -125,6 +130,10 @@ public ResourceProfile getResourceProfile() {
return resourceProfile;
}

public ResourceProfile getReportResourceProfile() {
return isDefault ? ResourceProfile.DEFAULT : resourceProfile;
}

public JobID getJobId() {
return jobId;
}
Expand All @@ -141,6 +150,10 @@ public boolean isEmpty() {
return tasks.isEmpty();
}

public boolean isDefault() {
return isDefault;
}

public boolean isActive(JobID activeJobId, AllocationID activeAllocationId) {
Preconditions.checkNotNull(activeJobId);
Preconditions.checkNotNull(activeAllocationId);
Expand Down

0 comments on commit 0f42dff

Please sign in to comment.