From 8593489cefdf064a7cbdf8f0f810361a9834810a Mon Sep 17 00:00:00 2001 From: Thesharing Date: Wed, 5 Aug 2020 15:54:48 +0800 Subject: [PATCH] [FLINK-18355][tests] Refactor tests of SlotPoolImpl with SlotPoolUtils and SlotPoolBuilder --- .../SlotPoolBatchSlotRequestTest.java | 68 +++++-- .../jobmaster/slotpool/SlotPoolBuilder.java | 36 +++- .../jobmaster/slotpool/SlotPoolImplTest.java | 192 +++++++----------- .../slotpool/SlotPoolInteractionsTest.java | 80 +++----- .../SlotPoolPendingRequestFailureTest.java | 39 +--- .../SlotPoolRequestCompletionTest.java | 24 +-- .../jobmaster/slotpool/SlotPoolUtils.java | 36 ++++ 7 files changed, 226 insertions(+), 249 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java index 8d1c23a9efd7b..bbf848d994bd9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java @@ -25,17 +25,21 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.util.clock.ManualClock; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.ManualClock; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -81,8 +85,7 @@ public static void teardownClass() { */ @Test public void testPendingBatchSlotRequestTimeout() throws Exception { - try (final SlotPoolImpl slotPool = new SlotPoolBuilder(mainThreadExecutor) - .build()) { + try (final SlotPoolImpl slotPool = createAndSetUpSlotPool(mainThreadExecutor, null, Time.milliseconds(2L))) { final CompletableFuture slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot( slotPool, mainThreadExecutor, @@ -107,10 +110,11 @@ public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); final ManualClock clock = new ManualClock(); - try (final TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor) - .setClock(clock) - .setBatchSlotTimeout(batchSlotTimeout) - .build()) { + try (final TestingSlotPoolImpl slotPool = createAndSetUpSlotPool( + directMainThreadExecutor, + null, + batchSlotTimeout, + clock)) { SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Collections.singletonList(resourceProfile)); @@ -141,10 +145,10 @@ public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exc final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); final Time batchSlotTimeout = Time.milliseconds(1000L); - try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor) - .setBatchSlotTimeout(batchSlotTimeout) - .setResourceManagerGateway(testingResourceManagerGateway) - .build()) { + try (final SlotPoolImpl slotPool = createAndSetUpSlotPool( + directMainThreadExecutor, + testingResourceManagerGateway, + batchSlotTimeout)) { final CompletableFuture slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile); @@ -191,10 +195,10 @@ public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exce final ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); final Time batchSlotTimeout = Time.milliseconds(1000L); - try (final SlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor) - .setBatchSlotTimeout(batchSlotTimeout) - .setResourceManagerGateway(testingResourceManagerGateway) - .build()) { + try (final SlotPoolImpl slotPool = createAndSetUpSlotPool( + directMainThreadExecutor, + testingResourceManagerGateway, + batchSlotTimeout)) { final CompletableFuture slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile); @@ -233,10 +237,12 @@ public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exceptio final ManualClock clock = new ManualClock(); final Time batchSlotTimeout = Time.milliseconds(1000L); - try (final TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor) - .setClock(clock) - .setBatchSlotTimeout(batchSlotTimeout) - .build()) { + try (final TestingSlotPoolImpl slotPool = createAndSetUpSlotPool( + directMainThreadExecutor, + null, + batchSlotTimeout, + clock)) { + final ResourceID taskManagerResourceId = SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Collections.singletonList(resourceProfile)); final CompletableFuture firstSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, resourceProfile); final CompletableFuture secondSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(slotPool, directMainThreadExecutor, ResourceProfile.UNKNOWN); @@ -276,4 +282,28 @@ private void advanceTimeAndTriggerCheckBatchSlotTimeout(TestingSlotPoolImpl slot // timeout all as unfulfillable marked slots slotPool.triggerCheckBatchSlotTimeout(); } + + private TestingSlotPoolImpl createAndSetUpSlotPool( + final ComponentMainThreadExecutor componentMainThreadExecutor, + @Nullable final ResourceManagerGateway resourceManagerGateway, + final Time batchSlotTimeout) throws Exception { + + return new SlotPoolBuilder(componentMainThreadExecutor) + .setResourceManagerGateway(resourceManagerGateway) + .setBatchSlotTimeout(batchSlotTimeout) + .build(); + } + + private TestingSlotPoolImpl createAndSetUpSlotPool( + final ComponentMainThreadExecutor componentMainThreadExecutor, + @Nullable final ResourceManagerGateway resourceManagerGateway, + final Time batchSlotTimeout, + final Clock clock) throws Exception { + + return new SlotPoolBuilder(componentMainThreadExecutor) + .setResourceManagerGateway(resourceManagerGateway) + .setBatchSlotTimeout(batchSlotTimeout) + .setClock(clock) + .build(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java index bd1cbe8d1b7d2..3e682cfab35ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBuilder.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -28,6 +29,8 @@ import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; +import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; /** @@ -35,16 +38,21 @@ */ public class SlotPoolBuilder { - private ComponentMainThreadExecutor componentMainThreadExecutor; - private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - private Time batchSlotTimeout = Time.milliseconds(2L); + private final ComponentMainThreadExecutor componentMainThreadExecutor; + + private JobID jobId = new JobID(); + private Time batchSlotTimeout = Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()); + private Time idleSlotTimeout = TestingUtils.infiniteTime(); private Clock clock = SystemClock.getInstance(); + @Nullable + private ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + public SlotPoolBuilder(ComponentMainThreadExecutor componentMainThreadExecutor) { this.componentMainThreadExecutor = componentMainThreadExecutor; } - public SlotPoolBuilder setResourceManagerGateway(ResourceManagerGateway resourceManagerGateway) { + public SlotPoolBuilder setResourceManagerGateway(@Nullable ResourceManagerGateway resourceManagerGateway) { this.resourceManagerGateway = resourceManagerGateway; return this; } @@ -54,22 +62,36 @@ public SlotPoolBuilder setBatchSlotTimeout(Time batchSlotTimeout) { return this; } + public SlotPoolBuilder setIdleSlotTimeout(Time idleSlotTimeout) { + this.idleSlotTimeout = idleSlotTimeout; + return this; + } + public SlotPoolBuilder setClock(Clock clock) { this.clock = clock; return this; } + public SlotPoolBuilder setJobId(JobID jobId) { + this.jobId = jobId; + return this; + } + public TestingSlotPoolImpl build() throws Exception { final TestingSlotPoolImpl slotPool = new TestingSlotPoolImpl( - new JobID(), + jobId, clock, TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), + idleSlotTimeout, batchSlotTimeout); slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor); - CompletableFuture.runAsync(() -> slotPool.connectToResourceManager(resourceManagerGateway), componentMainThreadExecutor).join(); + if (resourceManagerGateway != null) { + CompletableFuture.runAsync( + () -> slotPool.connectToResourceManager(resourceManagerGateway), componentMainThreadExecutor) + .join(); + } return slotPool; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java index cf6890a3aa850..eb74f37f3ddee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java @@ -42,10 +42,10 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.ManualClock; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; @@ -57,8 +57,6 @@ import org.junit.Before; import org.junit.Test; -import javax.annotation.Nonnull; - import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -74,7 +72,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; +import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils.requestNewAllocatedSlot; +import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils.requestNewAllocatedSlots; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.contains; @@ -92,9 +91,7 @@ */ public class SlotPoolImplTest extends TestLogger { - private final Time timeout = Time.seconds(10L); - - private JobID jobId; + private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT; private TaskManagerLocation taskManagerLocation; @@ -102,13 +99,11 @@ public class SlotPoolImplTest extends TestLogger { private TestingResourceManagerGateway resourceManagerGateway; - private ComponentMainThreadExecutor mainThreadExecutor = + private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); @Before public void setUp() throws Exception { - this.jobId = new JobID(); - taskManagerLocation = new LocalTaskManagerLocation(); taskManagerGateway = new SimpleAckingTaskManagerGateway(); resourceManagerGateway = new TestingResourceManagerGateway(); @@ -119,21 +114,15 @@ public void testAllocateSimpleSlot() throws Exception { CompletableFuture slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { - slotPool.registerTaskManager(taskManagerLocation.getResourceID()); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { - SlotRequestId requestId = new SlotRequestId(); + final SlotRequestId requestId = new SlotRequestId(); final CompletableFuture future = requestNewAllocatedSlot(slotPool, requestId); assertFalse(future.isDone()); - final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + final SlotRequest slotRequest = slotRequestFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - final SlotOffer slotOffer = new SlotOffer( - slotRequest.getAllocationId(), - 0, - DEFAULT_TESTING_PROFILE); - - assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); + assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, slotRequest.getAllocationId())); final PhysicalSlot physicalSlot = future.get(1, TimeUnit.SECONDS); assertTrue(future.isDone()); @@ -142,11 +131,6 @@ public void testAllocateSimpleSlot() throws Exception { } } - @Nonnull - private SlotPoolImpl createSlotPoolImpl() { - return new TestingSlotPoolImpl(jobId); - } - @Test public void testAllocationFulfilledByReturnedSlot() throws Exception { final ArrayBlockingQueue slotRequestQueue = new ArrayBlockingQueue<>(2); @@ -157,8 +141,7 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { } }); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { - slotPool.registerTaskManager(taskManagerLocation.getResourceID()); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final SlotRequestId requestId1 = new SlotRequestId(); final CompletableFuture future1 = requestNewAllocatedSlot( @@ -175,15 +158,10 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { final List slotRequests = new ArrayList<>(2); for (int i = 0; i < 2; i++) { - slotRequests.add(slotRequestQueue.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + slotRequests.add(slotRequestQueue.poll(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)); } - final SlotOffer slotOffer = new SlotOffer( - slotRequests.get(0).getAllocationId(), - 0, - DEFAULT_TESTING_PROFILE); - - assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); + assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, slotRequests.get(0).getAllocationId())); final PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); @@ -205,15 +183,10 @@ public void testAllocateWithFreeSlot() throws Exception { final CompletableFuture slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { - slotPool.registerTaskManager(taskManagerLocation.getResourceID()); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final AllocationID allocationId = new AllocationID(); - final SlotOffer slotOffer = new SlotOffer( - allocationId, - 0, - DEFAULT_TESTING_PROFILE); - assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); + assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, allocationId)); assertEquals(1, slotPool.getAvailableSlots().size()); assertEquals(0, slotPool.getAllocatedSlots().size()); @@ -234,7 +207,7 @@ public void testOfferSlot() throws Exception { resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); final SlotRequestId requestId = new SlotRequestId(); @@ -243,12 +216,12 @@ public void testOfferSlot() throws Exception { requestId); assertFalse(future.isDone()); - final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + final SlotRequest slotRequest = slotRequestFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); final SlotOffer slotOffer = new SlotOffer( slotRequest.getAllocationId(), 0, - DEFAULT_TESTING_PROFILE); + ResourceProfile.ANY); final TaskManagerLocation invalidTaskManagerLocation = new LocalTaskManagerLocation(); @@ -258,12 +231,12 @@ public void testOfferSlot() throws Exception { final SlotOffer nonRequestedSlotOffer = new SlotOffer( new AllocationID(), 0, - DEFAULT_TESTING_PROFILE); + ResourceProfile.ANY); // we'll also accept non requested slots assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, nonRequestedSlotOffer)); assertEquals(1, slotPool.getAllocatedSlots().size()); - final PhysicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + final PhysicalSlot slot = future.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); assertEquals(taskManagerLocation, slot.getTaskManagerLocation()); assertEquals(nonRequestedSlotOffer.getAllocationId(), slot.getAllocationId()); @@ -279,7 +252,7 @@ public void testOfferSlot() throws Exception { final SlotOffer anotherSlotOfferWithSameAllocationId = new SlotOffer( slotRequest.getAllocationId(), 1, - DEFAULT_TESTING_PROFILE); + ResourceProfile.ANY); assertFalse(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, anotherSlotOfferWithSameAllocationId)); TaskManagerLocation anotherTaskManagerLocation = new LocalTaskManagerLocation(); @@ -299,26 +272,20 @@ public void testReleaseResource() throws Exception { resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { - slotPool.registerTaskManager(taskManagerLocation.getResourceID()); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final SlotRequestId requestId1 = new SlotRequestId(); final CompletableFuture future1 = requestNewAllocatedSlot( slotPool, requestId1); - final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + final SlotRequest slotRequest = slotRequestFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); final CompletableFuture future2 = requestNewAllocatedSlot( slotPool, new SlotRequestId()); - final SlotOffer slotOffer = new SlotOffer( - slotRequest.getAllocationId(), - 0, - DEFAULT_TESTING_PROFILE); - - assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); + assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, slotRequest.getAllocationId())); final PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); @@ -357,7 +324,7 @@ public void testReleaseResource() throws Exception { @Test public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(2); resourceManagerGateway.setRequestSlotConsumer( (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId())); @@ -393,11 +360,7 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception assertEquals(allocationId1, canceledAllocations.take()); - final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.ANY); - - slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - - assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); + assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, allocationId1)); // the slot offer should fulfill the second slot request assertEquals(allocationId1, slotFuture2.get().getAllocationId()); @@ -413,7 +376,7 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception @Test public void testShutdownReleasesAllSlots() throws Exception { - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); final int numSlotOffers = 2; @@ -462,7 +425,7 @@ public void testShutdownReleasesAllSlots() throws Exception { public void testCheckIdleSlot() throws Exception { final ManualClock clock = new ManualClock(); - try (TestingSlotPoolImpl slotPool = createSlotPoolImpl(clock)) { + try (TestingSlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway, clock, TIMEOUT)) { final BlockingQueue freedSlots = new ArrayBlockingQueue<>(1); taskManagerGateway.setFreeSlotFunction( (AllocationID allocationId, Throwable cause) -> { @@ -474,8 +437,6 @@ public void testCheckIdleSlot() throws Exception { } }); - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - final AllocationID expiredSlotID = new AllocationID(); final AllocationID freshSlotID = new AllocationID(); final SlotOffer slotToExpire = new SlotOffer(expiredSlotID, 0, ResourceProfile.ANY); @@ -488,7 +449,7 @@ public void testCheckIdleSlot() throws Exception { slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotToExpire), Matchers.is(true)); - clock.advanceTime(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + clock.advanceTime(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); assertThat(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotToNotExpire), Matchers.is(true)); @@ -497,23 +458,13 @@ public void testCheckIdleSlot() throws Exception { slotPool.triggerCheckIdleSlot(); - final AllocationID freedSlot = freedSlots.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + final AllocationID freedSlot = freedSlots.poll(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); assertThat(freedSlot, Matchers.is(expiredSlotID)); assertThat(freedSlots.isEmpty(), Matchers.is(true)); } } - @Nonnull - private TestingSlotPoolImpl createSlotPoolImpl(ManualClock clock) { - return new TestingSlotPoolImpl( - jobId, - clock, - TestingUtils.infiniteTime(), - timeout, - TestingUtils.infiniteTime()); - } - /** * Tests that idle slots which cannot be released will be discarded. See FLINK-11059. */ @@ -521,9 +472,7 @@ private TestingSlotPoolImpl createSlotPoolImpl(ManualClock clock) { public void testDiscardIdleSlotIfReleasingFailed() throws Exception { final ManualClock clock = new ManualClock(); - try (TestingSlotPoolImpl slotPool = createSlotPoolImpl(clock)) { - - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); + try (TestingSlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway, clock, TIMEOUT)) { final AllocationID expiredAllocationId = new AllocationID(); final SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.ANY); @@ -538,7 +487,7 @@ public void testDiscardIdleSlotIfReleasingFailed() throws Exception { assertThat(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotToExpire), Matchers.is(true)); - clock.advanceTime(timeout.toMilliseconds() + 1, TimeUnit.MILLISECONDS); + clock.advanceTime(TIMEOUT.toMilliseconds() + 1, TimeUnit.MILLISECONDS); slotPool.triggerCheckIdleSlot(); @@ -565,7 +514,7 @@ public void testDiscardIdleSlotIfReleasingFailed() throws Exception { @Test public void testFreeFailedSlots() throws Exception { - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final int parallelism = 5; final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(parallelism); resourceManagerGateway.setRequestSlotConsumer( @@ -622,8 +571,9 @@ public void testFreeFailedSlots() throws Exception { */ @Test public void testCreateAllocatedSlotReport() throws Exception { + final JobID jobId = new JobID(); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway, jobId)) { final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(1); resourceManagerGateway.setRequestSlotConsumer( @@ -658,7 +608,7 @@ public void testCreateAllocatedSlotReport() throws Exception { @Test public void testCalculationOfTaskExecutorUtilization() throws Exception { - try (final SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (final SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final TaskManagerLocation firstTaskManagerLocation = new LocalTaskManagerLocation(); final TaskManagerLocation secondTaskManagerLocation = new LocalTaskManagerLocation(); @@ -691,7 +641,7 @@ public void testOrphanedAllocationCanBeRemapped() throws Exception { final List canceledAllocations = new ArrayList<>(); resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final SlotRequestId slotRequestId1 = new SlotRequestId(); final SlotRequestId slotRequestId2 = new SlotRequestId(); requestNewAllocatedSlots(slotPool, slotRequestId1, slotRequestId2); @@ -699,7 +649,7 @@ public void testOrphanedAllocationCanBeRemapped() throws Exception { final AllocationID allocationId1 = allocationIds.get(0); final AllocationID allocationId2 = allocationIds.get(1); - offerSlot(slotPool, allocationId2); + registerAndOfferSlot(taskManagerLocation, slotPool, allocationId2); // verify that orphaned allocationId2 is remapped to slotRequestId2 assertThat(slotPool.getPendingRequests().values(), hasSize(1)); @@ -718,7 +668,7 @@ public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception { final List canceledAllocations = new ArrayList<>(); resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add); - try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { + try (SlotPoolImpl slotPool = SlotPoolUtils.createAndSetUpSlotPool(resourceManagerGateway)) { final SlotRequestId slotRequestId1 = new SlotRequestId(); final SlotRequestId slotRequestId2 = new SlotRequestId(); requestNewAllocatedSlots(slotPool, slotRequestId1, slotRequestId2); @@ -732,7 +682,7 @@ public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception { randomAllocationId = new AllocationID(); } while (randomAllocationId.equals(allocationId1) || randomAllocationId.equals(allocationId2)); - offerSlot(slotPool, randomAllocationId); + registerAndOfferSlot(taskManagerLocation, slotPool, randomAllocationId); assertThat(slotPool.getPendingRequests().values(), hasSize(1)); assertThat(canceledAllocations, contains(allocationId1)); @@ -745,7 +695,7 @@ public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception { */ @Test public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception { - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { + try (SlotPoolImpl slotPool = new TestingSlotPoolImpl(new JobID())) { slotPool.start(JobMasterId.generate(), "mock-address", mainThreadExecutor); final SlotRequestId slotRequestId = new SlotRequestId(); @@ -754,7 +704,7 @@ public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception { assertThat(slotPool.getWaitingForResourceManager().values(), hasSize(1)); final AllocationID allocationId = new AllocationID(); - offerSlot(slotPool, allocationId); + registerAndOfferSlot(taskManagerLocation, slotPool, allocationId); assertThat(slotPool.getWaitingForResourceManager().values(), hasSize(0)); assertThat(slotFuture.isDone(), is(true)); @@ -763,22 +713,43 @@ public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception { } } - private void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) { - for (SlotRequestId slotRequestId : slotRequestIds) { - requestNewAllocatedSlot(slotPool, slotRequestId); - } + private static TestingSlotPoolImpl createAndSetUpSlotPool( + final ResourceManagerGateway resourceManagerGateway) throws Exception { + + return new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway).build(); } - private CompletableFuture requestNewAllocatedSlot( - final SlotPool slotPool, - final SlotRequestId slotRequestId) { - return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout); + private static TestingSlotPoolImpl createAndSetUpSlotPool( + final ResourceManagerGateway resourceManagerGateway, + final JobID jobId) throws Exception { + + return new SlotPoolBuilder(mainThreadExecutor) + .setResourceManagerGateway(resourceManagerGateway) + .setJobId(jobId) + .build(); } - private void offerSlot(final SlotPoolImpl slotPool, final AllocationID allocationId) { - final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY); + private static TestingSlotPoolImpl createAndSetUpSlotPool( + final ResourceManagerGateway resourceManagerGateway, + final Clock clock, + final Time idleSlotTimeout) throws Exception { + + return new SlotPoolBuilder(mainThreadExecutor) + .setResourceManagerGateway(resourceManagerGateway) + .setClock(clock) + .setIdleSlotTimeout(idleSlotTimeout) + .build(); + } + + private boolean registerAndOfferSlot( + final TaskManagerLocation taskManagerLocation, + final SlotPoolImpl slotPool, + final AllocationID allocationId) { + slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer); + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY); + return slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer); } private List registerAndOfferSlots(TaskManagerLocation taskManagerLocation, SlotPoolImpl slotPool, int numberOfSlotsToRegister) { @@ -834,21 +805,4 @@ protected boolean matchesSafely(AllocatedSlotInfo item, Description mismatchDesc } }; } - - private SlotPoolImpl createAndSetUpSlotPool() throws Exception { - final SlotPoolImpl slotPool = createSlotPoolImpl(); - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - return slotPool; - } - - private static void setupSlotPool( - SlotPoolImpl slotPool, - ResourceManagerGateway resourceManagerGateway, - ComponentMainThreadExecutor mainThreadExecutable) throws Exception { - final String jobManagerAddress = "foobar"; - - slotPool.start(JobMasterId.generate(), jobManagerAddress, mainThreadExecutable); - - slotPool.connectToResourceManager(resourceManagerGateway); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java index 779c2f5e7af04..055988aea56e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java @@ -18,36 +18,28 @@ package org.apache.flink.runtime.jobmaster.slotpool; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; -import org.apache.flink.util.clock.SystemClock; import org.junit.ClassRule; import org.junit.Test; -import javax.annotation.Nonnull; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -72,17 +64,8 @@ public class SlotPoolInteractionsTest extends TestLogger { @Test public void testSlotAllocationNoResourceManager() throws Exception { - final JobID jid = new JobID(); - - try (SlotPool pool = new SlotPoolImpl( - jid, - SystemClock.getInstance(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime() - )) { - pool.start(JobMasterId.generate(), "foobar", testMainThreadExecutor.getMainThreadExecutor()); + try (SlotPool pool = createAndSetUpSlotPoolWithoutResourceManager()) { final CompletableFuture future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot( new SlotRequestId(), @@ -100,17 +83,14 @@ public void testSlotAllocationNoResourceManager() throws Exception { @Test public void testCancelSlotAllocationWithoutResourceManager() throws Exception { - final JobID jid = new JobID(); - try (TestingSlotPoolImpl pool = createTestingSlotPool(jid)) { + try (TestingSlotPoolImpl pool = createAndSetUpSlotPoolWithoutResourceManager()) { final CompletableFuture timeoutFuture = new CompletableFuture<>(); pool.setTimeoutPendingSlotRequestConsumer(timeoutFuture::complete); - pool.start(JobMasterId.generate(), "foobar", testMainThreadExecutor.getMainThreadExecutor()); - SlotRequestId requestId = new SlotRequestId(); final CompletableFuture future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot( - requestId, + new SlotRequestId(), ResourceProfile.UNKNOWN, fastTimeout)); @@ -128,36 +108,19 @@ public void testCancelSlotAllocationWithoutResourceManager() throws Exception { } } - @Nonnull - private TestingSlotPoolImpl createTestingSlotPool(JobID jid) { - return new TestingSlotPoolImpl( - jid, - SystemClock.getInstance(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), - TestingUtils.infiniteTime()); - } - /** * Tests that a slot allocation times out wrt to the specified time out. */ @Test public void testSlotAllocationTimeout() throws Exception { - final JobID jid = new JobID(); - - try (TestingSlotPoolImpl pool = createTestingSlotPool(jid)) { - pool.start(JobMasterId.generate(), "foobar", testMainThreadExecutor.getMainThreadExecutor()); + try (TestingSlotPoolImpl pool = createAndSetUpSlotPool()) { final CompletableFuture slotRequestTimeoutFuture = new CompletableFuture<>(); pool.setTimeoutPendingSlotRequestConsumer(slotRequestTimeoutFuture::complete); - ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - pool.connectToResourceManager(resourceManagerGateway); - - SlotRequestId requestId = new SlotRequestId(); final CompletableFuture future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot( - requestId, + new SlotRequestId(), ResourceProfile.UNKNOWN, fastTimeout)); @@ -180,26 +143,21 @@ public void testSlotAllocationTimeout() throws Exception { */ @Test public void testExtraSlotsAreKept() throws Exception { - final JobID jid = new JobID(); + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); - try (TestingSlotPoolImpl pool = createTestingSlotPool(jid)) { + TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); - pool.start(JobMasterId.generate(), "foobar", testMainThreadExecutor.getMainThreadExecutor()); - - final CompletableFuture allocationIdFuture = new CompletableFuture<>(); - - TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - resourceManagerGateway.setRequestSlotConsumer( - (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + try (TestingSlotPoolImpl pool = new SlotPoolBuilder(testMainThreadExecutor.getMainThreadExecutor()) + .setResourceManagerGateway(resourceManagerGateway) + .build()) { final CompletableFuture slotRequestTimeoutFuture = new CompletableFuture<>(); pool.setTimeoutPendingSlotRequestConsumer(slotRequestTimeoutFuture::complete); - pool.connectToResourceManager(resourceManagerGateway); - - SlotRequestId requestId = new SlotRequestId(); final CompletableFuture future = testMainThreadExecutor.execute(() -> pool.requestNewAllocatedSlot( - requestId, + new SlotRequestId(), ResourceProfile.UNKNOWN, fastTimeout)); @@ -219,7 +177,7 @@ public void testExtraSlotsAreKept() throws Exception { final SlotOffer slotOffer = new SlotOffer( allocationId, 0, - DEFAULT_TESTING_PROFILE); + ResourceProfile.ANY); final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); @@ -230,4 +188,14 @@ public void testExtraSlotsAreKept() throws Exception { assertTrue(pool.containsAvailableSlot(allocationId)); } } + + private TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception { + return new SlotPoolBuilder(testMainThreadExecutor.getMainThreadExecutor()).build(); + } + + private TestingSlotPoolImpl createAndSetUpSlotPoolWithoutResourceManager() throws Exception { + return new SlotPoolBuilder(testMainThreadExecutor.getMainThreadExecutor()) + .setResourceManagerGateway(null) + .build(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java index c1361215d31ec..e9e386cdcbc30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmaster.slotpool; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -26,7 +25,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; @@ -51,6 +49,8 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; +import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils.createAndSetUpSlotPool; +import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils.requestNewAllocatedSlot; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -63,11 +63,6 @@ */ public class SlotPoolPendingRequestFailureTest extends TestLogger { - private static final JobID jobId = new JobID(); - - private static final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); - public static final Time TIMEOUT = Time.seconds(10L); - private TestingResourceManagerGateway resourceManagerGateway; @Before @@ -83,7 +78,7 @@ public void testFailingAllocationFailsPendingSlotRequests() throws Exception { final CompletableFuture allocationIdFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); - try (SlotPoolImpl slotPool = setUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final CompletableFuture slotFuture = requestNewAllocatedSlot(slotPool, new SlotRequestId()); @@ -110,7 +105,7 @@ public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Excep final List allocations = new ArrayList<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocations.add(slotRequest.getAllocationId())); - try (SlotPoolImpl slotPool = setUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final CompletableFuture slotFuture1 = requestNewAllocatedSlot(slotPool, new SlotRequestId()); final CompletableFuture slotFuture2 = requestNewAllocatedSlot(slotPool, new SlotRequestId()); @@ -148,7 +143,7 @@ public void testFailingAllocationFailsRemappedPendingSlotRequests() throws Excep @Test public void testFailingResourceManagerRequestFailsPendingSlotRequestAndCancelsRMRequest() throws Exception { - try (SlotPoolImpl slotPool = setUpSlotPool()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) { final CompletableFuture requestSlotFuture = new CompletableFuture<>(); final CompletableFuture cancelSlotFuture = new CompletableFuture<>(); final CompletableFuture requestSlotFutureAllocationId = new CompletableFuture<>(); @@ -181,7 +176,9 @@ public void testPendingSlotRequestTimeout() throws Exception { final ScheduledExecutorService singleThreadExecutor = Executors.newSingleThreadScheduledExecutor(); final ComponentMainThreadExecutor componentMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadExecutor); - final SlotPoolImpl slotPool = setUpSlotPool(componentMainThreadExecutor); + final SlotPoolImpl slotPool = new SlotPoolBuilder(componentMainThreadExecutor) + .setResourceManagerGateway(resourceManagerGateway) + .build(); try { final Time timeout = Time.milliseconds(5L); @@ -202,24 +199,4 @@ public void testPendingSlotRequestTimeout() throws Exception { } } - private CompletableFuture requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId) { - return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT); - } - - private CompletableFuture requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId, Time timeout) { - return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout); - } - - private SlotPoolImpl setUpSlotPool() throws Exception { - return setUpSlotPool(mainThreadExecutor); - } - - private SlotPoolImpl setUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception { - final SlotPoolImpl slotPool = new TestingSlotPoolImpl(jobId); - slotPool.start(JobMasterId.generate(), "foobar", componentMainThreadExecutor); - slotPool.connectToResourceManager(resourceManagerGateway); - - return slotPool; - } - } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java index 4411518e63609..04f61e72dbc53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java @@ -18,13 +18,10 @@ package org.apache.flink.runtime.jobmaster.slotpool; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; @@ -58,7 +55,7 @@ */ public class SlotPoolRequestCompletionTest extends TestLogger { - private static final Time TIMEOUT = Time.seconds(10L); + private static final Time TIMEOUT = SlotPoolUtils.TIMEOUT; private TestingResourceManagerGateway resourceManagerGateway; @@ -73,7 +70,7 @@ public void setUp() throws Exception { @Test public void testRequestsAreCompletedInRequestOrder() { runSlotRequestCompletionTest( - CheckedSupplier.unchecked(this::setUpSlotPoolAndConnectToResourceManager), + CheckedSupplier.unchecked(this::createAndSetUpSlotPool), slotPool -> {}); } @@ -83,7 +80,7 @@ public void testRequestsAreCompletedInRequestOrder() { @Test public void testStashOrderMaintainsRequestOrder() { runSlotRequestCompletionTest( - CheckedSupplier.unchecked(this::setUpSlotPool), + CheckedSupplier.unchecked(this::createAndSetUpSlotPoolWithoutResourceManager), this::connectToResourceManager); } @@ -129,22 +126,15 @@ private void runSlotRequestCompletionTest( } } - private SlotPoolImpl setUpSlotPoolAndConnectToResourceManager() throws Exception { - final SlotPoolImpl slotPool = setUpSlotPool(); - connectToResourceManager(slotPool); - - return slotPool; + private TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception { + return SlotPoolUtils.createAndSetUpSlotPool(resourceManagerGateway); } private void connectToResourceManager(SlotPoolImpl slotPool) { slotPool.connectToResourceManager(resourceManagerGateway); } - private SlotPoolImpl setUpSlotPool() throws Exception { - final SlotPoolImpl slotPool = new TestingSlotPoolImpl(new JobID()); - - slotPool.start(JobMasterId.generate(), "foobar", ComponentMainThreadExecutorServiceAdapter.forMainThread()); - - return slotPool; + private TestingSlotPoolImpl createAndSetUpSlotPoolWithoutResourceManager() throws Exception { + return SlotPoolUtils.createAndSetUpSlotPool(null); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java index 38368816fa9bf..cd23c9686561f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolUtils.java @@ -18,18 +18,23 @@ package org.apache.flink.runtime.jobmaster.slotpool; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkException; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -45,14 +50,45 @@ */ public class SlotPoolUtils { + public static final Time TIMEOUT = Time.seconds(10L); + private SlotPoolUtils() { throw new UnsupportedOperationException("Cannot instantiate this class."); } + static TestingSlotPoolImpl createAndSetUpSlotPool( + @Nullable final ResourceManagerGateway resourceManagerGateway) throws Exception { + + return new SlotPoolBuilder(ComponentMainThreadExecutorServiceAdapter.forMainThread()) + .setResourceManagerGateway(resourceManagerGateway).build(); + } + + static CompletableFuture requestNewAllocatedSlot( + final SlotPool slotPool, + final SlotRequestId slotRequestId) { + + return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT); + } + + static CompletableFuture requestNewAllocatedSlot( + final SlotPool slotPool, + final SlotRequestId slotRequestId, + final Time timeout) { + + return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout); + } + + static void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) { + for (SlotRequestId slotRequestId : slotRequestIds) { + requestNewAllocatedSlot(slotPool, slotRequestId); + } + } + public static CompletableFuture requestNewAllocatedBatchSlot( SlotPool slotPool, ComponentMainThreadExecutor mainThreadExecutor, ResourceProfile resourceProfile) { + return CompletableFuture .supplyAsync(() -> slotPool.requestNewAllocatedBatchSlot(new SlotRequestId(), resourceProfile), mainThreadExecutor) .thenCompose(Function.identity());