diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java index 389641528ab919..2a493b7a84c612 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java @@ -24,18 +24,15 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit; -import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo; import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -87,7 +84,6 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -123,17 +119,11 @@ public void testAllocateSimpleSlot() throws Exception { CompletableFuture slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); SlotRequestId requestId = new SlotRequestId(); - CompletableFuture future = scheduler.allocateSlot( - requestId, - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + CompletableFuture future = requestNewAllocatedSlot(slotPool, requestId); assertFalse(future.isDone()); final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -145,10 +135,10 @@ public void testAllocateSimpleSlot() throws Exception { assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - LogicalSlot slot = future.get(1, TimeUnit.SECONDS); + PhysicalSlot physicalSlot = future.get(1, TimeUnit.SECONDS); assertTrue(future.isDone()); - assertTrue(slot.isAlive()); - assertEquals(taskManagerLocation, slot.getTaskManagerLocation()); + assertEquals(taskManagerLocation, physicalSlot.getTaskManagerLocation()); + assertEquals(slotRequest.getAllocationId(), physicalSlot.getAllocationId()); } } @@ -167,21 +157,19 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { } }); - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); - CompletableFuture future2 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + SlotRequestId requestId1 = new SlotRequestId(); + CompletableFuture future1 = requestNewAllocatedSlot( + slotPool, + requestId1 + ); + SlotRequestId requestId2 = new SlotRequestId(); + CompletableFuture future2 = requestNewAllocatedSlot( + slotPool, + requestId2 + ); assertFalse(future1.isDone()); assertFalse(future2.isDone()); @@ -199,23 +187,18 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); + PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); assertFalse(future2.isDone()); // return this slot to pool - slot1.releaseSlot(); + slotPool.releaseSlot(requestId1, null); // second allocation fulfilled by previous slot returning - LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); + PhysicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); assertTrue(future2.isDone()); - assertNotEquals(slot1, slot2); - assertFalse(slot1.isAlive()); - assertTrue(slot2.isAlive()); - assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation()); - assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber()); - assertEquals(slot1.getAllocationId(), slot2.getAllocationId()); + assertEquals(slot1, slot2); } } @@ -224,16 +207,14 @@ public void testAllocateWithFreeSlot() throws Exception { final CompletableFuture slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + SlotRequestId requestId1 = new SlotRequestId(); + CompletableFuture future1 = requestNewAllocatedSlot( + slotPool, + requestId1 + ); assertFalse(future1.isDone()); final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -245,27 +226,25 @@ public void testAllocateWithFreeSlot() throws Exception { assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); + PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); // return this slot to pool - slot1.releaseSlot(); + slotPool.releaseSlot(requestId1, null); - CompletableFuture future2 = scheduler.allocateSlot( + assertEquals(1, slotPool.getAvailableSlots().size()); + assertEquals(0, slotPool.getAllocatedSlots().size()); + + Optional optional = slotPool.allocateAvailableSlot( new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + slotRequest.getAllocationId() + ); // second allocation fulfilled by previous slot returning - LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); - assertTrue(future2.isDone()); + assertTrue(optional.isPresent()); + PhysicalSlot slot2 = optional.get(); - assertNotEquals(slot1, slot2); - assertFalse(slot1.isAlive()); - assertTrue(slot2.isAlive()); - assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation()); - assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber()); + assertEquals(slot1, slot2); } } @@ -275,16 +254,14 @@ public void testOfferSlot() throws Exception { resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + SlotRequestId requestId = new SlotRequestId(); + CompletableFuture future = requestNewAllocatedSlot( + slotPool, + requestId + ); assertFalse(future.isDone()); final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -309,12 +286,16 @@ public void testOfferSlot() throws Exception { // accepted slot assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - assertTrue(slot.isAlive()); + PhysicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + assertEquals(1, slotPool.getAvailableSlots().size()); + assertEquals(1, slotPool.getAllocatedSlots().size()); + assertEquals(taskManagerLocation, slot.getTaskManagerLocation()); + assertEquals(nonRequestedSlotOffer.getAllocationId(), slot.getAllocationId()); // duplicated offer with using slot assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - assertTrue(slot.isAlive()); + assertEquals(1, slotPool.getAllocatedSlots().size()); + assertEquals(nonRequestedSlotOffer.getAllocationId(), slot.getAllocationId()); final SlotOffer anotherSlotOfferWithSameAllocationId = new SlotOffer( slotRequest.getAllocationId(), @@ -326,7 +307,7 @@ public void testOfferSlot() throws Exception { assertFalse(slotPool.offerSlot(anotherTaskManagerLocation, taskManagerGateway, slotOffer)); // duplicated offer with free slot - slot.releaseSlot(); + slotPool.releaseSlot(requestId, null); assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); assertFalse(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, anotherSlotOfferWithSameAllocationId)); assertFalse(slotPool.offerSlot(anotherTaskManagerLocation, taskManagerGateway, slotOffer)); @@ -339,24 +320,21 @@ public void testReleaseResource() throws Exception { resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + SlotRequestId requestId1 = new SlotRequestId(); + CompletableFuture future1 = requestNewAllocatedSlot( + slotPool, + requestId1 + ); final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - CompletableFuture future2 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + CompletableFuture future2 = requestNewAllocatedSlot( + slotPool, + new SlotRequestId() + ); final SlotOffer slotOffer = new SlotOffer( slotRequest.getAllocationId(), @@ -365,19 +343,26 @@ public void testReleaseResource() throws Exception { assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); + PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); assertFalse(future2.isDone()); final CompletableFuture releaseFuture = new CompletableFuture<>(); - final DummyPayload dummyPayload = new DummyPayload(releaseFuture); - slot1.tryAssignPayload(dummyPayload); + SingleLogicalSlot logicalSlot = SingleLogicalSlot.allocateFromPhysicalSlot( + requestId1, + slot1, + Locality.UNKNOWN, + new DummySlotOwner(), + true + ); + + logicalSlot.tryAssignPayload(new DummyPayload(releaseFuture)); slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), null); - releaseFuture.get(); - assertFalse(slot1.isAlive()); + releaseFuture.get(1, TimeUnit.SECONDS); + assertFalse(logicalSlot.isAlive()); // slot released and not usable, second allocation still not fulfilled Thread.sleep(10); @@ -396,7 +381,7 @@ public void testReleaseResource() throws Exception { @Test public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(2); resourceManagerGateway.setRequestSlotConsumer( (SlotRequest slotRequest) -> allocationIds.offer(slotRequest.getAllocationId())); @@ -404,28 +389,19 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::offer); final SlotRequestId slotRequestId1 = new SlotRequestId(); final SlotRequestId slotRequestId2 = new SlotRequestId(); - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - final Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); - final ScheduledUnit scheduledUnit = new ScheduledUnit( - new JobVertexID(), - null, - null); - - CompletableFuture slotFuture1 = scheduler.allocateSlot( - slotRequestId1, - scheduledUnit, - SlotProfile.noRequirements(), - timeout); + CompletableFuture slotFuture1 = requestNewAllocatedSlot( + slotPool, + slotRequestId1 + ); // wait for the first slot request final AllocationID allocationId1 = allocationIds.take(); - CompletableFuture slotFuture2 = scheduler.allocateSlot( - slotRequestId2, - scheduledUnit, - SlotProfile.noRequirements(), - timeout); + CompletableFuture slotFuture2 = requestNewAllocatedSlot( + slotPool, + slotRequestId2 + ); // wait for the second slot request final AllocationID allocationId2 = allocationIds.take(); @@ -463,9 +439,7 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception @Test public void testShutdownReleasesAllSlots() throws Exception { - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); final int numSlotOffers = 2; @@ -576,7 +550,6 @@ public void testDiscardIdleSlotIfReleasingFailed() throws Exception { try (TestingSlotPoolImpl slotPool = createSlotPoolImpl(clock)) { setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); final AllocationID expiredAllocationId = new AllocationID(); final SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.ANY); @@ -597,11 +570,14 @@ public void testDiscardIdleSlotIfReleasingFailed() throws Exception { freeSlotLatch.await(); - CompletableFuture allocatedSlotFuture = allocateSlot(scheduler, new SlotRequestId()); + CompletableFuture future = requestNewAllocatedSlot( + slotPool, + new SlotRequestId() + ); try { // since the slot must have been discarded, we cannot fulfill the slot request - allocatedSlotFuture.get(10L, TimeUnit.MILLISECONDS); + future.get(10L, TimeUnit.MILLISECONDS); fail("Expected to fail with a timeout."); } catch (TimeoutException ignored) { // expected @@ -616,20 +592,17 @@ public void testDiscardIdleSlotIfReleasingFailed() throws Exception { @Test public void testFreeFailedSlots() throws Exception { - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { final int parallelism = 5; final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(parallelism); resourceManagerGateway.setRequestSlotConsumer( slotRequest -> allocationIds.offer(slotRequest.getAllocationId())); - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); - - final Map> slotRequestFutures = new HashMap<>(parallelism); + final Map> slotRequestFutures = new HashMap<>(parallelism); for (int i = 0; i < parallelism; i++) { final SlotRequestId slotRequestId = new SlotRequestId(); - slotRequestFutures.put(slotRequestId, allocateSlot(scheduler, slotRequestId)); + slotRequestFutures.put(slotRequestId, requestNewAllocatedSlot(slotPool, slotRequestId)); } final List slotOffers = new ArrayList<>(parallelism); @@ -666,6 +639,8 @@ public void testFreeFailedSlots() throws Exception { final Optional emptyTaskExecutorFuture = slotPool.failAllocation( slotOffer.getAllocationId(), failException); + + assertTrue(emptyTaskExecutorFuture.isPresent()); assertThat(emptyTaskExecutorFuture.get(), is(equalTo(taskManagerLocation.getResourceID()))); assertThat(freedSlots.take(), is(equalTo(slotOffer.getAllocationId()))); } @@ -677,17 +652,17 @@ public void testFreeFailedSlots() throws Exception { @Test public void testCreateAllocatedSlotReport() throws Exception { - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(1); resourceManagerGateway.setRequestSlotConsumer( - slotRequest -> allocationIds.offer(slotRequest.getAllocationId())); - - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + slotRequest -> allocationIds.offer(slotRequest.getAllocationId())); final SlotRequestId slotRequestId = new SlotRequestId(); - final CompletableFuture slotRequestFuture = allocateSlot(scheduler, slotRequestId); + final CompletableFuture slotRequestFuture = requestNewAllocatedSlot( + slotPool, + slotRequestId + ); final List allocatedSlotInfos = new ArrayList<>(2); final List slotOffers = new ArrayList<>(2); @@ -704,7 +679,7 @@ public void testCreateAllocatedSlotReport() throws Exception { slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers); // wait for the completion of slot future - slotRequestFuture.get(); + slotRequestFuture.get(1, TimeUnit.SECONDS); final AllocatedSlotReport slotReport = slotPool.createAllocatedSlotReport(taskManagerLocation.getResourceID()); assertThat(jobId, is(slotReport.getJobId())); @@ -714,9 +689,7 @@ public void testCreateAllocatedSlotReport() throws Exception { @Test public void testCalculationOfTaskExecutorUtilization() throws Exception { - try (final SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - + try (final SlotPoolImpl slotPool = createAndSetUpSlotPool()) { final TaskManagerLocation firstTaskManagerLocation = new LocalTaskManagerLocation(); final TaskManagerLocation secondTaskManagerLocation = new LocalTaskManagerLocation(); @@ -893,14 +866,6 @@ protected boolean matchesSafely(AllocatedSlotInfo item, Description mismatchDesc }; } - private CompletableFuture allocateSlot(Scheduler scheduler, SlotRequestId slotRequestId) { - return scheduler.allocateSlot( - slotRequestId, - new DummyScheduledUnit(), - SlotProfile.noRequirements(), - timeout); - } - private SlotPoolImpl createAndSetUpSlotPool() throws Exception { final SlotPoolImpl slotPool = createSlotPoolImpl(); setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); @@ -917,12 +882,4 @@ private static void setupSlotPool( slotPool.connectToResourceManager(resourceManagerGateway); } - - private static Scheduler setupScheduler( - SlotPool slotPool, - ComponentMainThreadExecutor mainThreadExecutable) { - Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); - scheduler.start(mainThreadExecutable); - return scheduler; - } }