Skip to content

Commit

Permalink
Simplify tests of SlotPoolImpl with SlotPoolBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing committed Jul 22, 2020
1 parent 8a85d27 commit 3e263c1
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,19 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.ManualClock;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -56,23 +53,20 @@
*/
public class SlotPoolBatchSlotRequestTest extends TestLogger {

private static final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();

private static final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1024);
private static final ResourceProfile smallerResourceProfile = ResourceProfile.fromResources(0.5, 512);
public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0];
private static ScheduledExecutorService singleThreadScheduledExecutorService;
private static ComponentMainThreadExecutor mainThreadExecutor;

@BeforeClass
public static void setupClass() {
singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
}
private SlotPoolBuilder slotPoolBuilder;
private TestingResourceManagerGateway testingResourceManagerGateway;

@AfterClass
public static void teardownClass() {
if (singleThreadScheduledExecutorService != null) {
singleThreadScheduledExecutorService.shutdownNow();
}
@Before
public void setup() throws Exception {
testingResourceManagerGateway = new TestingResourceManagerGateway();
slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor);
}

/**
Expand All @@ -81,7 +75,8 @@ public static void teardownClass() {
*/
@Test
public void testPendingBatchSlotRequestTimeout() throws Exception {
try (final SlotPoolImpl slotPool = new SlotPoolBuilder(mainThreadExecutor)
try (final SlotPoolImpl slotPool = slotPoolBuilder
.setBatchSlotTimeout(Time.milliseconds(2L))
.build()) {
final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(
slotPool,
Expand All @@ -104,21 +99,24 @@ public void testPendingBatchSlotRequestTimeout() throws Exception {
@Test
public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() throws Exception {
final Time batchSlotTimeout = Time.milliseconds(2L);
final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
final ManualClock clock = new ManualClock();

try (final TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
try (final TestingSlotPoolImpl slotPool = slotPoolBuilder
.setClock(clock)
.setBatchSlotTimeout(batchSlotTimeout)
.build()) {

SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Collections.singletonList(resourceProfile));
SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.singletonList(resourceProfile));

final CompletableFuture<PhysicalSlot> firstSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> secondSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, ResourceProfile.UNKNOWN);
final CompletableFuture<PhysicalSlot> thirdSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, smallerResourceProfile);
final CompletableFuture<PhysicalSlot> firstSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> secondSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
final CompletableFuture<PhysicalSlot> thirdSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, smallerResourceProfile);

final List<CompletableFuture<PhysicalSlot>> slotFutures = Arrays.asList(firstSlotFuture, secondSlotFuture, thirdSlotFuture);
final List<CompletableFuture<PhysicalSlot>> slotFutures =
Arrays.asList(firstSlotFuture, secondSlotFuture, thirdSlotFuture);
advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, clock, batchSlotTimeout);

for (CompletableFuture<PhysicalSlot> slotFuture : slotFutures) {
Expand All @@ -134,21 +132,26 @@ public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th
*/
@Test
public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exception {
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));

final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
testingResourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));

final Time batchSlotTimeout = Time.milliseconds(1000L);
try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
.setBatchSlotTimeout(batchSlotTimeout)
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {
try (final SlotPoolImpl slotPool = slotPoolBuilder
.setBatchSlotTimeout(batchSlotTimeout)
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(
slotPool,
mainThreadExecutor,
resourceProfile);

SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get(), new FlinkException("Failed request"));
SlotPoolUtils.failAllocation(
slotPool,
mainThreadExecutor,
allocationIdFuture.get(),
new FlinkException("Failed request"));

assertThat(slotFuture.isDone(), is(false));
}
Expand All @@ -160,19 +163,22 @@ public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exc
*/
@Test
public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
testingResourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));

final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {
try (final SlotPoolImpl slotPool = slotPoolBuilder
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(
slotPool,
mainThreadExecutor,
resourceProfile);

SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, allocationIdFuture.get(),
SlotPoolUtils.failAllocation(slotPool,
mainThreadExecutor,
allocationIdFuture.get(),
new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN));

assertThat(slotFuture.isCompletedExceptionally(), is(true));
Expand All @@ -185,18 +191,16 @@ public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() thr
*/
@Test
public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exception {
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(new FlinkException("Failed request")));

final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

final Time batchSlotTimeout = Time.milliseconds(1000L);
try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
try (final SlotPoolImpl slotPool = slotPoolBuilder
.setBatchSlotTimeout(batchSlotTimeout)
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> slotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, resourceProfile);

assertThat(slotFuture.isDone(), is(false));
}
Expand All @@ -207,17 +211,15 @@ public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exce
*/
@Test
public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws Exception {
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(
new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN)));

final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
try (final SlotPoolImpl slotPool = slotPoolBuilder
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> slotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, resourceProfile);

assertThat(slotFuture.isCompletedExceptionally(), is(true));
}
Expand All @@ -229,27 +231,32 @@ public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() thro
*/
@Test
public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exception {
final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
final ManualClock clock = new ManualClock();
final Time batchSlotTimeout = Time.milliseconds(1000L);

try (final TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
try (final TestingSlotPoolImpl slotPool = slotPoolBuilder
.setClock(clock)
.setBatchSlotTimeout(batchSlotTimeout)
.build()) {
final ResourceID taskManagerResourceId = SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Collections.singletonList(resourceProfile));
final CompletableFuture<PhysicalSlot> firstSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> secondSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, ResourceProfile.UNKNOWN);
final CompletableFuture<PhysicalSlot> thirdSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, smallerResourceProfile);

final List<CompletableFuture<PhysicalSlot>> slotFutures = Arrays.asList(firstSlotFuture, secondSlotFuture, thirdSlotFuture);
final ResourceID taskManagerResourceId =
SlotPoolUtils.offerSlots(slotPool, mainThreadExecutor, Collections.singletonList(resourceProfile));
final CompletableFuture<PhysicalSlot> firstSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> secondSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
final CompletableFuture<PhysicalSlot> thirdSlotFuture =
SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, mainThreadExecutor, smallerResourceProfile);

final List<CompletableFuture<PhysicalSlot>> slotFutures =
Arrays.asList(firstSlotFuture, secondSlotFuture, thirdSlotFuture);

// initial batch slot timeout check
advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, clock, batchSlotTimeout);

assertThat(CompletableFuture.anyOf(slotFutures.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY)).isDone(), is(false));
assertThat(CompletableFuture.anyOf(slotFutures.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY)).isDone(),
is(false));

SlotPoolUtils.releaseTaskManager(slotPool, directMainThreadExecutor, taskManagerResourceId);
SlotPoolUtils.releaseTaskManager(slotPool, mainThreadExecutor, taskManagerResourceId);

advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, clock, batchSlotTimeout);

Expand All @@ -266,7 +273,10 @@ public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exceptio
}
}

private void advanceTimeAndTriggerCheckBatchSlotTimeout(TestingSlotPoolImpl slotPool, ManualClock clock, Time batchSlotTimeout) {
private void advanceTimeAndTriggerCheckBatchSlotTimeout(
TestingSlotPoolImpl slotPool,
ManualClock clock,
Time batchSlotTimeout) {
// trigger batch slot timeout check which marks unfulfillable slots
slotPool.triggerCheckBatchSlotTimeout();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
Expand All @@ -35,9 +36,10 @@
*/
public class SlotPoolBuilder {

private ComponentMainThreadExecutor componentMainThreadExecutor;
private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
private Time batchSlotTimeout = Time.milliseconds(2L);
private final ComponentMainThreadExecutor componentMainThreadExecutor;
private ResourceManagerGateway resourceManagerGateway;
private Time batchSlotTimeout = Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue());
private Time idleSlotTimeout = TestingUtils.infiniteTime();
private Clock clock = SystemClock.getInstance();

public SlotPoolBuilder(ComponentMainThreadExecutor componentMainThreadExecutor) {
Expand All @@ -54,23 +56,47 @@ public SlotPoolBuilder setBatchSlotTimeout(Time batchSlotTimeout) {
return this;
}

public SlotPoolBuilder setIdleSlotTimeout(Time idleSlotTimeout) {
this.idleSlotTimeout = idleSlotTimeout;
return this;
}

public SlotPoolBuilder setClock(Clock clock) {
this.clock = clock;
return this;
}

public TestingSlotPoolImpl build() throws Exception {
public TestingSlotPoolImpl build(JobID jobID, Boolean connectToResourceManager) throws Exception {
final TestingSlotPoolImpl slotPool = new TestingSlotPoolImpl(
new JobID(),
jobID,
clock,
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
idleSlotTimeout,
batchSlotTimeout);

slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor);

CompletableFuture.runAsync(() -> slotPool.connectToResourceManager(resourceManagerGateway), componentMainThreadExecutor).join();
if (connectToResourceManager) {
if (resourceManagerGateway == null) {
resourceManagerGateway = new TestingResourceManagerGateway();
}
CompletableFuture.runAsync(
() -> slotPool.connectToResourceManager(resourceManagerGateway), componentMainThreadExecutor)
.join();
}

return slotPool;
}

public TestingSlotPoolImpl build(Boolean connectToResourceManager) throws Exception {
return build(new JobID(), connectToResourceManager);
}

public TestingSlotPoolImpl build(JobID jobID) throws Exception {
return build(jobID, true);
}

public TestingSlotPoolImpl build() throws Exception {
return build(new JobID());
}
}

0 comments on commit 3e263c1

Please sign in to comment.