diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java index b0182e33ead458..f1fd68af60771a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java @@ -137,16 +137,9 @@ public void testAllocateSimpleSlot() throws Exception { assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); PhysicalSlot physicalSlot = future.get(1, TimeUnit.SECONDS); - LogicalSlot logicalSlot = SingleLogicalSlot.allocateFromPhysicalSlot( - requestId, - physicalSlot, - Locality.UNKNOWN, - new TestSlotOwner(), - true - ); assertTrue(future.isDone()); - assertTrue(logicalSlot.isAlive()); - assertEquals(taskManagerLocation, logicalSlot.getTaskManagerLocation()); + assertEquals(physicalSlot.getTaskManagerLocation(), taskManagerLocation); + assertEquals(physicalSlot.getAllocationId(), slotRequest.getAllocationId()); } } @@ -165,21 +158,19 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { } }); - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); - CompletableFuture future2 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + SlotRequestId requestId1 = new SlotRequestId(); + CompletableFuture future1 = requestNewAllocatedSlot( + slotPool, + requestId1 + ); + SlotRequestId requestId2 = new SlotRequestId(); + CompletableFuture future2 = requestNewAllocatedSlot( + slotPool, + requestId2 + ); assertFalse(future1.isDone()); assertFalse(future2.isDone()); @@ -197,23 +188,18 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); + PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); assertFalse(future2.isDone()); // return this slot to pool - slot1.releaseSlot(); + slotPool.releaseSlot(requestId1, null); // second allocation fulfilled by previous slot returning - LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); + PhysicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); assertTrue(future2.isDone()); - assertNotEquals(slot1, slot2); - assertFalse(slot1.isAlive()); - assertTrue(slot2.isAlive()); - assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation()); - assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber()); - assertEquals(slot1.getAllocationId(), slot2.getAllocationId()); + assertEquals(slot1, slot2); } } @@ -222,16 +208,14 @@ public void testAllocateWithFreeSlot() throws Exception { final CompletableFuture slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete); - try (SlotPoolImpl slotPool = createSlotPoolImpl()) { - setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); - Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); + try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) { slotPool.registerTaskManager(taskManagerLocation.getResourceID()); - CompletableFuture future1 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + SlotRequestId requestId1 = new SlotRequestId(); + CompletableFuture future1 = requestNewAllocatedSlot( + slotPool, + requestId1 + ); assertFalse(future1.isDone()); final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -243,27 +227,22 @@ public void testAllocateWithFreeSlot() throws Exception { assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer)); - LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); + PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS); assertTrue(future1.isDone()); // return this slot to pool - slot1.releaseSlot(); + slotPool.releaseSlot(requestId1, null); - CompletableFuture future2 = scheduler.allocateSlot( - new SlotRequestId(), - new DummyScheduledUnit(), - SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - timeout); + CompletableFuture future2 = requestNewAllocatedSlot( + slotPool, + new SlotRequestId() + ); // second allocation fulfilled by previous slot returning - LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); + PhysicalSlot slot2 = future2.get(1, TimeUnit.SECONDS); assertTrue(future2.isDone()); - assertNotEquals(slot1, slot2); - assertFalse(slot1.isAlive()); - assertTrue(slot2.isAlive()); - assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation()); - assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber()); + assertEquals(slot1, slot2); } } @@ -679,7 +658,7 @@ public void testCreateAllocatedSlotReport() throws Exception { final ArrayBlockingQueue allocationIds = new ArrayBlockingQueue<>(1); resourceManagerGateway.setRequestSlotConsumer( - slotRequest -> allocationIds.offer(slotRequest.getAllocationId())); + slotRequest -> allocationIds.offer(slotRequest.getAllocationId())); setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor); Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor); @@ -826,8 +805,8 @@ private void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequest } private CompletableFuture requestNewAllocatedSlot( - final SlotPool slotPool, - final SlotRequestId slotRequestId) { + final SlotPool slotPool, + final SlotRequestId slotRequestId) { return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout); } @@ -923,10 +902,4 @@ private static Scheduler setupScheduler( scheduler.start(mainThreadExecutable); return scheduler; } - - private class TestSlotOwner implements SlotOwner { - @Override - public void returnLogicalSlot(LogicalSlot logicalSlot) { - } - } }