Skip to content

Commit

Permalink
[FLINK-20864][runtime] Apply exact matching logic for fine-grained re…
Browse files Browse the repository at this point in the history
…source management
  • Loading branch information
KarmaGYZ committed Jan 13, 2021
1 parent 01948b0 commit 1617d36
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ResourceProfile getAvailableBudget() {

public boolean reserve(final ResourceProfile reservation) {
checkResourceProfileNotNullOrUnknown(reservation);
if (!availableBudget.isMatching(reservation)) {
if (!availableBudget.allFieldsNoLessThan(reservation)) {
return false;
}

Expand All @@ -61,7 +61,7 @@ public boolean reserve(final ResourceProfile reservation) {
public boolean release(final ResourceProfile reservation) {
checkResourceProfileNotNullOrUnknown(reservation);
ResourceProfile newAvailableBudget = availableBudget.merge(reservation);
if (!totalBudget.isMatching(newAvailableBudget)) {
if (!totalBudget.allFieldsNoLessThan(newAvailableBudget)) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ private void throwUnsupportedOperationExecptionIfUnknown() {
*/
public boolean isMatching(final ResourceProfile required) {
checkNotNull(required, "Cannot check matching with null resources");
throwUnsupportedOperationExecptionIfUnknown();

if (this.equals(ANY)) {
return true;
Expand All @@ -263,21 +264,59 @@ public boolean isMatching(final ResourceProfile required) {
return true;
}

if (required.equals(UNKNOWN)) {
return true;
}

return false;
}

/**
* Check whether all fields of this resource profile are no less than the given resource
* profile.
*
* <p>It is not same with the total resource comparison. It return true iff each resource
* field(cpu, task heap memory, managed memory, etc.) is no less than the respective field of
* the given profile.
*
* <p>For example, assume that this profile has 1 core, 50 managed memory and 100 heap memory.
*
* <ol>
* <li>The comparison will return false if the other profile has 2 core, 10 managed memory and
* 1000 heap memory.
* <li>The comparison will return true if the other profile has 1 core, 50 managed memory and
* 150 heap memory.
* </ol>
*
* @param other the other resource profile
* @return true if all fields of this are no less than the other's, otherwise false
*/
public boolean allFieldsNoLessThan(final ResourceProfile other) {
checkNotNull(other, "Cannot compare null resources");

if (this.equals(ANY)) {
return true;
}

if (this.equals(other)) {
return true;
}

if (this.equals(UNKNOWN)) {
return false;
}

if (required.equals(UNKNOWN)) {
if (other.equals(UNKNOWN)) {
return true;
}

if (cpuCores.getValue().compareTo(required.cpuCores.getValue()) >= 0
&& taskHeapMemory.compareTo(required.taskHeapMemory) >= 0
&& taskOffHeapMemory.compareTo(required.taskOffHeapMemory) >= 0
&& managedMemory.compareTo(required.managedMemory) >= 0
&& networkMemory.compareTo(required.networkMemory) >= 0) {
if (cpuCores.getValue().compareTo(other.cpuCores.getValue()) >= 0
&& taskHeapMemory.compareTo(other.taskHeapMemory) >= 0
&& taskOffHeapMemory.compareTo(other.taskOffHeapMemory) >= 0
&& managedMemory.compareTo(other.managedMemory) >= 0
&& networkMemory.compareTo(other.networkMemory) >= 0) {

for (Map.Entry<String, Resource> resource : required.extendedResources.entrySet()) {
for (Map.Entry<String, Resource> resource : other.extendedResources.entrySet()) {
if (!extendedResources.containsKey(resource.getKey())
|| extendedResources
.get(resource.getKey())
Expand Down Expand Up @@ -376,7 +415,8 @@ public ResourceProfile subtract(final ResourceProfile other) {
}

checkArgument(
isMatching(other), "Try to subtract an unmatched resource profile from this one.");
allFieldsNoLessThan(other),
"Try to subtract an unmatched resource profile from this one.");

Map<String, Resource> resultExtendedResource = new HashMap<>(extendedResources);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public Builder withCoLocationConstraint(final CoLocationConstraint coLocationCon

public ExecutionVertexSchedulingRequirements build() {
checkState(
physicalSlotResourceProfile.isMatching(taskResourceProfile),
physicalSlotResourceProfile.allFieldsNoLessThan(taskResourceProfile),
"The physical slot resources must fulfill the task slot requirements");

return new ExecutionVertexSchedulingRequirements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ public boolean allocateSlot(
// transfer the index to an increasing number not less than the numberSlots.
int index = requestedIndex < 0 ? nextDynamicSlotIndex() : requestedIndex;
ResourceProfile effectiveResourceProfile =
isDynamicIndex(index) ? resourceProfile : defaultSlotResourceProfile;
resourceProfile.equals(ResourceProfile.UNKNOWN)
? defaultSlotResourceProfile
: resourceProfile;

TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ private Matcher<SlotSelectionStrategy.SlotInfoAndResources> withSlotInfo(SlotInf
public void matchPreferredLocation() {

SlotProfile slotProfile =
SlotProfile.preferredLocality(resourceProfile, Collections.singletonList(tml2));
SlotProfile.preferredLocality(
biggerResourceProfile, Collections.singletonList(tml2));
Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);

Assert.assertEquals(slotInfo2, match.get().getSlotInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void matchPreviousLocationNotAvailableAndAllOthersBlacklisted() {
@Test
public void matchPreviousLocationNotAvailableAndSomeOthersBlacklisted() {
HashSet<AllocationID> blacklisted = new HashSet<>(3);
blacklisted.add(aid1);
blacklisted.add(aid2);
blacklisted.add(aid3);
blacklisted.add(aid4);
SlotProfile slotProfile =
Expand All @@ -114,6 +114,6 @@ public void matchPreviousLocationNotAvailableAndSomeOthersBlacklisted() {
Optional<SlotSelectionStrategy.SlotInfoAndLocality> match = runMatching(slotProfile);

// we expect that the candidate that is not blacklisted is returned
Assert.assertEquals(slotInfo2, match.get().getSlotInfo());
Assert.assertEquals(slotInfo1, match.get().getSlotInfo());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ResourceProfileTest extends TestLogger {
MAX_MEMORY_SIZE_TO_LOG.add(MemorySize.ofMebiBytes(10));

@Test
public void testMatchRequirement() {
public void testAllFieldsNoLessThanProfile() {
final ResourceProfile rp1 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
Expand Down Expand Up @@ -80,19 +80,19 @@ public void testMatchRequirement() {
.setManagedMemoryMB(200)
.build();

assertFalse(rp1.isMatching(rp2));
assertTrue(rp2.isMatching(rp1));
assertFalse(rp1.allFieldsNoLessThan(rp2));
assertTrue(rp2.allFieldsNoLessThan(rp1));

assertFalse(rp1.isMatching(rp3));
assertTrue(rp3.isMatching(rp1));
assertFalse(rp1.allFieldsNoLessThan(rp3));
assertTrue(rp3.allFieldsNoLessThan(rp1));

assertFalse(rp2.isMatching(rp3));
assertFalse(rp3.isMatching(rp2));
assertFalse(rp2.allFieldsNoLessThan(rp3));
assertFalse(rp3.allFieldsNoLessThan(rp2));

assertTrue(rp4.isMatching(rp1));
assertTrue(rp4.isMatching(rp2));
assertTrue(rp4.isMatching(rp3));
assertTrue(rp4.isMatching(rp4));
assertTrue(rp4.allFieldsNoLessThan(rp1));
assertTrue(rp4.allFieldsNoLessThan(rp2));
assertTrue(rp4.allFieldsNoLessThan(rp3));
assertTrue(rp4.allFieldsNoLessThan(rp4));

final ResourceProfile rp5 =
ResourceProfile.newBuilder()
Expand All @@ -102,23 +102,66 @@ public void testMatchRequirement() {
.setManagedMemoryMB(100)
.setNetworkMemoryMB(100)
.build();
assertFalse(rp4.isMatching(rp5));
assertFalse(rp4.allFieldsNoLessThan(rp5));

ResourceSpec rs1 = ResourceSpec.newBuilder(1.0, 100).setGPUResource(2.2).build();
ResourceSpec rs2 = ResourceSpec.newBuilder(1.0, 100).setGPUResource(1.1).build();

assertFalse(rp1.isMatching(ResourceProfile.fromResourceSpec(rs1)));
assertFalse(rp1.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)));
assertTrue(
ResourceProfile.fromResourceSpec(rs1)
.isMatching(ResourceProfile.fromResourceSpec(rs2)));
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs2)));
assertFalse(
ResourceProfile.fromResourceSpec(rs2)
.isMatching(ResourceProfile.fromResourceSpec(rs1)));
.allFieldsNoLessThan(ResourceProfile.fromResourceSpec(rs1)));
}

@Test
public void testUnknownMatchesUnknown() {
assertTrue(ResourceProfile.UNKNOWN.isMatching(ResourceProfile.UNKNOWN));
public void testUnknownNoLessThanUnknown() {
assertTrue(ResourceProfile.UNKNOWN.allFieldsNoLessThan(ResourceProfile.UNKNOWN));
}

@Test
public void testMatchRequirement() {
final ResourceProfile resource1 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.setTaskOffHeapMemoryMB(100)
.setManagedMemoryMB(100)
.build();
final ResourceProfile resource2 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.setTaskOffHeapMemoryMB(100)
.setManagedMemoryMB(100)
.addExtendedResource("gpu", new GPUResource(1.0))
.build();
final ResourceProfile requirement1 = ResourceProfile.UNKNOWN;
final ResourceProfile requirement2 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.setTaskOffHeapMemoryMB(100)
.setManagedMemoryMB(100)
.build();
final ResourceProfile requirement3 =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.setTaskOffHeapMemoryMB(100)
.setManagedMemoryMB(100)
.addExtendedResource("gpu", new GPUResource(1.0))
.build();

assertTrue(resource1.isMatching(requirement1));
assertTrue(resource1.isMatching(requirement2));
assertFalse(resource1.isMatching(requirement3));

assertTrue(resource2.isMatching(requirement1));
assertFalse(resource2.isMatching(requirement2));
assertTrue(resource2.isMatching(requirement3));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,7 @@ public void testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequiremen

final ResourceProfile largeResourceProfile =
ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
final ResourceProfile smallResourceProfile =
ResourceProfile.newBuilder().setManagedMemoryMB(512).build();
final ResourceProfile smallResourceProfile = ResourceProfile.UNKNOWN;

slotPool.increaseResourceRequirementsBy(
ResourceCounter.withResource(largeResourceProfile, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -118,7 +117,9 @@ public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th
createAndSetUpSlotPool(directMainThreadExecutor, null, batchSlotTimeout, clock)) {

SlotPoolUtils.offerSlots(
slotPool, directMainThreadExecutor, Collections.singletonList(resourceProfile));
slotPool,
directMainThreadExecutor,
Arrays.asList(resourceProfile, smallerResourceProfile));

final CompletableFuture<PhysicalSlot> firstSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(
Expand Down Expand Up @@ -288,7 +289,7 @@ public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exceptio
SlotPoolUtils.offerSlots(
slotPool,
directMainThreadExecutor,
Collections.singletonList(resourceProfile));
Arrays.asList(resourceProfile, smallerResourceProfile));
final CompletableFuture<PhysicalSlot> firstSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(
slotPool, directMainThreadExecutor, resourceProfile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
@Test
public void testTurnOnKeepsPendingFulfillableRequests() throws Exception {
// setup
final ResourceProfile availableProfile = ResourceProfile.fromResources(2.0, 100);
final ResourceProfile fulfillableProfile = ResourceProfile.fromResources(1.0, 100);
final ResourceProfile resourceProfile = ResourceProfile.fromResources(2.0, 100);

final SlotManager slotManager = createSlotManagerNotStartingNewTMs();
slotManager.setFailUnfulfillableRequest(false);
registerFreeSlot(slotManager, availableProfile);
registerFreeSlot(slotManager, resourceProfile);

slotManager.registerSlotRequest(slotRequest(fulfillableProfile));
slotManager.registerSlotRequest(slotRequest(fulfillableProfile));
slotManager.registerSlotRequest(slotRequest(resourceProfile));
slotManager.registerSlotRequest(slotRequest(resourceProfile));

// test
slotManager.setFailUnfulfillableRequest(true);
Expand Down Expand Up @@ -111,7 +110,8 @@ public void testTurnOnCancelsPendingUnFulfillableRequests() throws Exception {
public void testTurnOnKeepsRequestsWithStartingTMs() throws Exception {
// setup
final ResourceProfile availableProfile = ResourceProfile.fromResources(2.0, 100);
final ResourceProfile newTmProfile = ResourceProfile.fromResources(2.0, 200);
final ResourceProfile newTmProfile =
SlotManagerImpl.generateDefaultSlotResourceProfile(WORKER_RESOURCE_SPEC, 1);

final SlotManager slotManager = createSlotManagerStartingNewTMs();
slotManager.setFailUnfulfillableRequest(false);
Expand Down Expand Up @@ -172,7 +172,8 @@ public void testUnfulfillableRequestsFailWhenOn() {
public void testStartingTmKeepsSlotPendingWhenOn() throws Exception {
// setup
final ResourceProfile availableProfile = ResourceProfile.fromResources(2.0, 100);
final ResourceProfile newTmProfile = ResourceProfile.fromResources(2.0, 200);
final ResourceProfile newTmProfile =
SlotManagerImpl.generateDefaultSlotResourceProfile(WORKER_RESOURCE_SPEC, 1);

final SlotManager slotManager = createSlotManagerStartingNewTMs();
registerFreeSlot(slotManager, availableProfile);
Expand Down

0 comments on commit 1617d36

Please sign in to comment.