Skip to content

Commit

Permalink
fixup! [FLINK-21433][runtime] Replace defaultSlotProfile with numSlot…
Browse files Browse the repository at this point in the history
…s for creating PendingTaskManager.
  • Loading branch information
xintongsong committed Feb 23, 2021
1 parent bf7b7db commit c55b5ba
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

/** Utilities for {@link SlotManager} implementations. */
public class SlotManagerUtils {

/**
* This must be consist with {@link
* org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils#generateDefaultSlotResourceProfile}.
*/
public static ResourceProfile generateDefaultSlotResourceProfile(
WorkerResourceSpec workerResourceSpec, int numSlotsPerWorker) {
return ResourceProfile.newBuilder()
Expand All @@ -34,6 +39,10 @@ public static ResourceProfile generateDefaultSlotResourceProfile(
.build();
}

/**
* This must be consist with {@link
* org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils#generateDefaultSlotResourceProfile}.
*/
public static ResourceProfile generateDefaultSlotResourceProfile(
ResourceProfile resourceProfile, int numSlotsPerWorker) {
return ResourceProfile.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ private static void checkTaskExecutorNetworkConfigSet(ReadableConfig config) {
}
}

static ResourceProfile generateDefaultSlotResourceProfile(
/**
* This must be consist with {@link
* org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils#generateDefaultSlotResourceProfile}.
*/
@VisibleForTesting
public static ResourceProfile generateDefaultSlotResourceProfile(
TaskExecutorResourceSpec taskExecutorResourceSpec, int numberOfSlots) {
return ResourceProfile.newBuilder()
.setCpuCores(taskExecutorResourceSpec.getCpuCores().divide(numberOfSlots))
Expand All @@ -118,7 +123,8 @@ static ResourceProfile generateDefaultSlotResourceProfile(
.build();
}

static ResourceProfile generateTotalAvailableResourceProfile(
@VisibleForTesting
public static ResourceProfile generateTotalAvailableResourceProfile(
TaskExecutorResourceSpec taskExecutorResourceSpec) {
return ResourceProfile.newBuilder()
.setCpuCores(taskExecutorResourceSpec.getCpuCores())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceSpec;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand All @@ -30,7 +34,7 @@
/** Tests for the {@link SlotManagerUtils}. */
public class SlotManagerUtilsTest extends TestLogger {
@Test
public void testGenerateDefaultSlotProfile() {
public void testGenerateDefaultSlotProfileFromWorkerResourceSpec() {
final int numSlots = 5;
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
Expand All @@ -54,6 +58,60 @@ public void testGenerateDefaultSlotProfile() {
is(resourceProfile));
}

@Test
public void testGenerateDefaultSlotProfileFromTotalResourceProfile() {
final int numSlots = 5;
final ResourceProfile resourceProfile =
ResourceProfile.newBuilder()
.setCpuCores(1.0)
.setTaskHeapMemoryMB(1)
.setTaskOffHeapMemoryMB(2)
.setNetworkMemoryMB(3)
.setManagedMemoryMB(4)
.build();
final ResourceProfile totalResourceProfile =
ResourceProfile.newBuilder()
.setCpuCores(1.0 * numSlots)
.setTaskHeapMemoryMB(1 * numSlots)
.setTaskOffHeapMemoryMB(2 * numSlots)
.setNetworkMemoryMB(3 * numSlots)
.setManagedMemoryMB(4 * numSlots)
.build();

assertThat(
SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlots),
is(resourceProfile));
}

@Test
public void testGenerateDefaultSlotConsistentWithTaskExecutorResourceUtils() {
final int numSlots = 5;
final TaskExecutorResourceSpec taskExecutorResourceSpec =
new TaskExecutorResourceSpec(
new CPUResource(1.0),
MemorySize.parse("1m"),
MemorySize.parse("2m"),
MemorySize.parse("3m"),
MemorySize.parse("4m"));

final ResourceProfile resourceProfileFromTaskExecutorResourceUtils =
TaskExecutorResourceUtils.generateDefaultSlotResourceProfile(
taskExecutorResourceSpec, numSlots);

final ResourceProfile totalResourceProfile =
TaskExecutorResourceUtils.generateTotalAvailableResourceProfile(
taskExecutorResourceSpec);
final WorkerResourceSpec workerResourceSpec =
WorkerResourceSpec.fromTotalResourceProfile(totalResourceProfile);

assertThat(
SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlots),
is(resourceProfileFromTaskExecutorResourceUtils));
assertThat(
SlotManagerUtils.generateDefaultSlotResourceProfile(workerResourceSpec, numSlots),
is(resourceProfileFromTaskExecutorResourceUtils));
}

@Test
public void testCalculateDefaultNumSlots() {
final ResourceProfile defaultSlotResource =
Expand Down

0 comments on commit c55b5ba

Please sign in to comment.