Skip to content

Commit

Permalink
[fix-up][tests] Refactor SlotPoolImplTest with SlotPoolTestBase
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing committed Jul 29, 2020
1 parent 664fa0a commit 084c389
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -37,16 +39,20 @@
public class SlotPoolBuilder {

private final ComponentMainThreadExecutor componentMainThreadExecutor;
private ResourceManagerGateway resourceManagerGateway;

private JobID jobId = new JobID();
private Time batchSlotTimeout = Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue());
private Time idleSlotTimeout = TestingUtils.infiniteTime();
private Clock clock = SystemClock.getInstance();

@Nullable
private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();

public SlotPoolBuilder(ComponentMainThreadExecutor componentMainThreadExecutor) {
this.componentMainThreadExecutor = componentMainThreadExecutor;
}

public SlotPoolBuilder setResourceManagerGateway(ResourceManagerGateway resourceManagerGateway) {
public SlotPoolBuilder setResourceManagerGateway(@Nullable ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = resourceManagerGateway;
return this;
}
Expand All @@ -66,37 +72,27 @@ public SlotPoolBuilder setClock(Clock clock) {
return this;
}

public TestingSlotPoolImpl build(JobID jobID, Boolean connectToResourceManager) throws Exception {
public SlotPoolBuilder setJobId(JobID jobId) {
this.jobId = jobId;
return this;
}

public TestingSlotPoolImpl build() throws Exception {
final TestingSlotPoolImpl slotPool = new TestingSlotPoolImpl(
jobID,
jobId,
clock,
TestingUtils.infiniteTime(),
idleSlotTimeout,
batchSlotTimeout);

slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor);

if (connectToResourceManager) {
if (resourceManagerGateway == null) {
resourceManagerGateway = new TestingResourceManagerGateway();
}
if (resourceManagerGateway != null) {
CompletableFuture.runAsync(
() -> slotPool.connectToResourceManager(resourceManagerGateway), componentMainThreadExecutor)
.join();
}

return slotPool;
}

public TestingSlotPoolImpl build(Boolean connectToResourceManager) throws Exception {
return build(new JobID(), connectToResourceManager);
}

public TestingSlotPoolImpl build(JobID jobID) throws Exception {
return build(jobID, true);
}

public TestingSlotPoolImpl build() throws Exception {
return build(new JobID());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
Expand All @@ -42,6 +40,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -86,10 +85,6 @@
*/
public class SlotPoolImplTest extends SlotPoolTestBase {

private static final Time TIMEOUT = Time.seconds(10L);
private static final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();

private TaskManagerLocation taskManagerLocation;
private SimpleAckingTaskManagerGateway taskManagerGateway;

Expand Down Expand Up @@ -445,10 +440,7 @@ public void testShutdownReleasesAllSlots() throws Exception {
public void testCheckIdleSlot() throws Exception {
final ManualClock clock = new ManualClock();

try (TestingSlotPoolImpl slotPool = slotPoolBuilder
.setClock(clock)
.setIdleSlotTimeout(TIMEOUT)
.build()) {
try (TestingSlotPoolImpl slotPool = createAndSetUpSlotPool(clock, TIMEOUT)) {
final BlockingQueue<AllocationID> freedSlots = new ArrayBlockingQueue<>(1);
taskManagerGateway.setFreeSlotFunction(
(AllocationID allocationId, Throwable cause) -> {
Expand Down Expand Up @@ -495,10 +487,7 @@ public void testCheckIdleSlot() throws Exception {
public void testDiscardIdleSlotIfReleasingFailed() throws Exception {
final ManualClock clock = new ManualClock();

try (TestingSlotPoolImpl slotPool = slotPoolBuilder
.setClock(clock)
.setIdleSlotTimeout(TIMEOUT)
.build()) {
try (TestingSlotPoolImpl slotPool = createAndSetUpSlotPool(clock, TIMEOUT)) {
final AllocationID expiredAllocationId = new AllocationID();
final SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.ANY);

Expand Down Expand Up @@ -597,9 +586,9 @@ public void testFreeFailedSlots() throws Exception {
*/
@Test
public void testCreateAllocatedSlotReport() throws Exception {
final JobID jobID = new JobID();
final JobID jobId = new JobID();

try (SlotPoolImpl slotPool = slotPoolBuilder.build(jobID)) {
try (SlotPoolImpl slotPool = createAndSetUpSlotPool(jobId)) {

final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
resourceManagerGateway.setRequestSlotConsumer(
Expand Down Expand Up @@ -628,21 +617,20 @@ public void testCreateAllocatedSlotReport() throws Exception {
slotRequestFuture.get();

final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID());
assertThat(jobID, is(slotReport.getJobId()));
assertThat(jobId, is(slotReport.getJobId()));
assertThat(slotReport.getAllocatedSlotInfos(), containsInAnyOrder(isEachEqual(allocatedSlotInfos)));
}
}

@Test
public void testCalculationOfTaskExecutorUtilization() throws Exception {
try (final SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

final TaskManagerLocation firstTaskManagerLocation = new LocalTaskManagerLocation();
final TaskManagerLocation secondTaskManagerLocation = new LocalTaskManagerLocation();

final List<AllocationID> firstTaskManagersSlots = offerSlots(firstTaskManagerLocation, slotPool, 4);
final List<AllocationID> secondTaskManagersSlots = offerSlots(secondTaskManagerLocation, slotPool, 4);
final List<AllocationID> firstTaskManagersSlots = registerAndOfferSlots(firstTaskManagerLocation, slotPool, 4);
final List<AllocationID> secondTaskManagersSlots = registerAndOfferSlots(secondTaskManagerLocation, slotPool, 4);

slotPool.allocateAvailableSlot(new SlotRequestId(), firstTaskManagersSlots.get(0));
slotPool.allocateAvailableSlot(new SlotRequestId(), firstTaskManagersSlots.get(1));
Expand Down Expand Up @@ -671,7 +659,6 @@ public void testOrphanedAllocationCanBeRemapped() throws Exception {
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

final SlotRequestId slotRequestId1 = new SlotRequestId();
final SlotRequestId slotRequestId2 = new SlotRequestId();
Expand All @@ -680,7 +667,7 @@ public void testOrphanedAllocationCanBeRemapped() throws Exception {
final AllocationID allocationId1 = allocationIds.get(0);
final AllocationID allocationId2 = allocationIds.get(1);

offerSlot(slotPool, allocationId2);
registerAndOfferSlot(taskManagerLocation, slotPool, allocationId2);

// verify that orphaned allocationId2 is remapped to slotRequestId2
assertThat(slotPool.getPendingRequests().values(), hasSize(1));
Expand All @@ -699,9 +686,7 @@ public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception {
final List<AllocationID> canceledAllocations = new ArrayList<>();
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

try (SlotPoolImpl slotPool = slotPoolBuilder
.setResourceManagerGateway(resourceManagerGateway)
.build()) {
try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

final SlotRequestId slotRequestId1 = new SlotRequestId();
Expand All @@ -717,7 +702,7 @@ public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception {
randomAllocationId = new AllocationID();
} while (randomAllocationId.equals(allocationId1) || randomAllocationId.equals(allocationId2));

offerSlot(slotPool, randomAllocationId);
registerAndOfferSlot(taskManagerLocation, slotPool, randomAllocationId);

assertThat(slotPool.getPendingRequests().values(), hasSize(1));
assertThat(canceledAllocations, contains(allocationId1));
Expand All @@ -739,8 +724,7 @@ public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception {
assertThat(slotPool.getWaitingForResourceManager().values(), hasSize(1));

final AllocationID allocationId = new AllocationID();
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
offerSlot(slotPool, allocationId);
registerAndOfferSlot(taskManagerLocation, slotPool, allocationId);

assertThat(slotPool.getWaitingForResourceManager().values(), hasSize(0));
assertThat(slotFuture.isDone(), is(true));
Expand All @@ -749,15 +733,28 @@ public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception {
}
}

private void offerSlot(final SlotPoolImpl slotPool, final AllocationID allocationId) {
private TestingSlotPoolImpl createAndSetUpSlotPool(Clock clock, Time idleSlotTimeout) throws Exception {
return slotPoolBuilder.setClock(clock).setIdleSlotTimeout(idleSlotTimeout).build();
}

private void registerAndOfferSlot(
final TaskManagerLocation taskManagerLocation,
final SlotPoolImpl slotPool,
final AllocationID allocationId) {

registerTaskManager(slotPool, taskManagerLocation);

final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer);
}

private List<AllocationID> offerSlots(
TaskManagerLocation taskManagerLocation,
SlotPoolImpl slotPool,
int numberOfSlotsToRegister) {
private List<AllocationID> registerAndOfferSlots(
final TaskManagerLocation taskManagerLocation,
final SlotPoolImpl slotPool,
final int numberOfSlotsToRegister) {

registerTaskManager(slotPool, taskManagerLocation);

final List<AllocationID> allocationIds = IntStream.range(0, numberOfSlotsToRegister)
.mapToObj(ignored -> new AllocationID())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
Expand Down Expand Up @@ -34,22 +37,38 @@ protected TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
return slotPoolBuilder.build();
}

protected void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) {
protected TestingSlotPoolImpl createAndSetUpSlotPool(final JobID jobId) throws Exception {
return slotPoolBuilder.setJobId(jobId).build();
}

protected TestingSlotPoolImpl createAndSetUpSlotPool(
final ResourceManagerGateway resourceManagerGateway) throws Exception {

return slotPoolBuilder.setResourceManagerGateway(resourceManagerGateway).build();
}

static void registerTaskManager(final SlotPool slotPool, final TaskManagerLocation taskManagerLocation) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
}

static void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) {
for (SlotRequestId slotRequestId : slotRequestIds) {
requestNewAllocatedSlot(slotPool, slotRequestId);
}
}

protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId) {
static CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId) {

return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT);
}

protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId,
final Time timeout) {
static CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId,
final Time timeout) {

return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout);
}
}

0 comments on commit 084c389

Please sign in to comment.