Skip to content

Commit

Permalink
[FLINK-18355][tests] Refactor tests of SlotPoolImpl with SlotPoolUtil…
Browse files Browse the repository at this point in the history
…s and SlotPoolBuilder
  • Loading branch information
Thesharing authored and zhuzhurk committed Aug 13, 2020
1 parent 8cf4dde commit 8593489
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 249 deletions.
Expand Up @@ -25,17 +25,21 @@
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -81,8 +85,7 @@ public static void teardownClass() {
*/
@Test
public void testPendingBatchSlotRequestTimeout() throws Exception {
try (final SlotPoolImpl slotPool = new SlotPoolBuilder(mainThreadExecutor)
.build()) {
try (final SlotPoolImpl slotPool = createAndSetUpSlotPool(mainThreadExecutor, null, Time.milliseconds(2L))) {
final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(
slotPool,
mainThreadExecutor,
Expand All @@ -107,10 +110,11 @@ public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th
final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
final ManualClock clock = new ManualClock();

try (final TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
.setClock(clock)
.setBatchSlotTimeout(batchSlotTimeout)
.build()) {
try (final TestingSlotPoolImpl slotPool = createAndSetUpSlotPool(
directMainThreadExecutor,
null,
batchSlotTimeout,
clock)) {

SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Collections.singletonList(resourceProfile));

Expand Down Expand Up @@ -141,10 +145,10 @@ public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exc
final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

final Time batchSlotTimeout = Time.milliseconds(1000L);
try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
.setBatchSlotTimeout(batchSlotTimeout)
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {
try (final SlotPoolImpl slotPool = createAndSetUpSlotPool(
directMainThreadExecutor,
testingResourceManagerGateway,
batchSlotTimeout)) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);

Expand Down Expand Up @@ -191,10 +195,10 @@ public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exce
final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();

final Time batchSlotTimeout = Time.milliseconds(1000L);
try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
.setBatchSlotTimeout(batchSlotTimeout)
.setResourceManagerGateway(testingResourceManagerGateway)
.build()) {
try (final SlotPoolImpl slotPool = createAndSetUpSlotPool(
directMainThreadExecutor,
testingResourceManagerGateway,
batchSlotTimeout)) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);

Expand Down Expand Up @@ -233,10 +237,12 @@ public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exceptio
final ManualClock clock = new ManualClock();
final Time batchSlotTimeout = Time.milliseconds(1000L);

try (final TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor)
.setClock(clock)
.setBatchSlotTimeout(batchSlotTimeout)
.build()) {
try (final TestingSlotPoolImpl slotPool = createAndSetUpSlotPool(
directMainThreadExecutor,
null,
batchSlotTimeout,
clock)) {

final ResourceID taskManagerResourceId = SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Collections.singletonList(resourceProfile));
final CompletableFuture<PhysicalSlot> firstSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile);
final CompletableFuture<PhysicalSlot> secondSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, ResourceProfile.UNKNOWN);
Expand Down Expand Up @@ -276,4 +282,28 @@ private void advanceTimeAndTriggerCheckBatchSlotTimeout(TestingSlotPoolImpl slot
// timeout all as unfulfillable marked slots
slotPool.triggerCheckBatchSlotTimeout();
}

private TestingSlotPoolImpl createAndSetUpSlotPool(
final ComponentMainThreadExecutor componentMainThreadExecutor,
@Nullable final ResourceManagerGateway resourceManagerGateway,
final Time batchSlotTimeout) throws Exception {

return new SlotPoolBuilder(componentMainThreadExecutor)
.setResourceManagerGateway(resourceManagerGateway)
.setBatchSlotTimeout(batchSlotTimeout)
.build();
}

private TestingSlotPoolImpl createAndSetUpSlotPool(
final ComponentMainThreadExecutor componentMainThreadExecutor,
@Nullable final ResourceManagerGateway resourceManagerGateway,
final Time batchSlotTimeout,
final Clock clock) throws Exception {

return new SlotPoolBuilder(componentMainThreadExecutor)
.setResourceManagerGateway(resourceManagerGateway)
.setBatchSlotTimeout(batchSlotTimeout)
.setClock(clock)
.build();
}
}
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
Expand All @@ -28,23 +29,30 @@
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

/**
* Builder for a {@link TestingSlotPoolImpl}.
*/
public class SlotPoolBuilder {

private ComponentMainThreadExecutor componentMainThreadExecutor;
private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
private Time batchSlotTimeout = Time.milliseconds(2L);
private final ComponentMainThreadExecutor componentMainThreadExecutor;

private JobID jobId = new JobID();
private Time batchSlotTimeout = Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue());
private Time idleSlotTimeout = TestingUtils.infiniteTime();
private Clock clock = SystemClock.getInstance();

@Nullable
private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();

public SlotPoolBuilder(ComponentMainThreadExecutor componentMainThreadExecutor) {
this.componentMainThreadExecutor = componentMainThreadExecutor;
}

public SlotPoolBuilder setResourceManagerGateway(ResourceManagerGateway resourceManagerGateway) {
public SlotPoolBuilder setResourceManagerGateway(@Nullable ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = resourceManagerGateway;
return this;
}
Expand All @@ -54,22 +62,36 @@ public SlotPoolBuilder setBatchSlotTimeout(Time batchSlotTimeout) {
return this;
}

public SlotPoolBuilder setIdleSlotTimeout(Time idleSlotTimeout) {
this.idleSlotTimeout = idleSlotTimeout;
return this;
}

public SlotPoolBuilder setClock(Clock clock) {
this.clock = clock;
return this;
}

public SlotPoolBuilder setJobId(JobID jobId) {
this.jobId = jobId;
return this;
}

public TestingSlotPoolImpl build() throws Exception {
final TestingSlotPoolImpl slotPool = new TestingSlotPoolImpl(
new JobID(),
jobId,
clock,
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
idleSlotTimeout,
batchSlotTimeout);

slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor);

CompletableFuture.runAsync(() -> slotPool.connectToResourceManager(resourceManagerGateway), componentMainThreadExecutor).join();
if (resourceManagerGateway != null) {
CompletableFuture.runAsync(
() -> slotPool.connectToResourceManager(resourceManagerGateway), componentMainThreadExecutor)
.join();
}

return slotPool;
}
Expand Down

0 comments on commit 8593489

Please sign in to comment.