Skip to content

Commit

Permalink
Try to modify testAllocationFulfilledByReturnedSlot and testAllocateW…
Browse files Browse the repository at this point in the history
…ithFreeSlot
  • Loading branch information
Thesharing committed Jul 17, 2020
1 parent 6031423 commit e9db5e6
Showing 1 changed file with 34 additions and 61 deletions.
Expand Up @@ -137,16 +137,9 @@ public void testAllocateSimpleSlot() throws Exception {
assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));

PhysicalSlot physicalSlot = future.get(1, TimeUnit.SECONDS);
LogicalSlot logicalSlot = SingleLogicalSlot.allocateFromPhysicalSlot(
requestId,
physicalSlot,
Locality.UNKNOWN,
new TestSlotOwner(),
true
);
assertTrue(future.isDone());
assertTrue(logicalSlot.isAlive());
assertEquals(taskManagerLocation, logicalSlot.getTaskManagerLocation());
assertEquals(physicalSlot.getTaskManagerLocation(), taskManagerLocation);
assertEquals(physicalSlot.getAllocationId(), slotRequest.getAllocationId());
}
}

Expand All @@ -165,21 +158,19 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception {
}
});

try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

CompletableFuture<LogicalSlot> future1 = scheduler.allocateSlot(
new SlotRequestId(),
new DummyScheduledUnit(),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
timeout);
CompletableFuture<LogicalSlot> future2 = scheduler.allocateSlot(
new SlotRequestId(),
new DummyScheduledUnit(),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
timeout);
SlotRequestId requestId1 = new SlotRequestId();
CompletableFuture<PhysicalSlot> future1 = requestNewAllocatedSlot(
slotPool,
requestId1
);
SlotRequestId requestId2 = new SlotRequestId();
CompletableFuture<PhysicalSlot> future2 = requestNewAllocatedSlot(
slotPool,
requestId2
);

assertFalse(future1.isDone());
assertFalse(future2.isDone());
Expand All @@ -197,23 +188,18 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception {

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));

LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
assertFalse(future2.isDone());

// return this slot to pool
slot1.releaseSlot();
slotPool.releaseSlot(requestId1, null);

// second allocation fulfilled by previous slot returning
LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
PhysicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
assertTrue(future2.isDone());

assertNotEquals(slot1, slot2);
assertFalse(slot1.isAlive());
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
assertEquals(slot1, slot2);
}
}

Expand All @@ -222,16 +208,14 @@ public void testAllocateWithFreeSlot() throws Exception {
final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);

try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

CompletableFuture<LogicalSlot> future1 = scheduler.allocateSlot(
new SlotRequestId(),
new DummyScheduledUnit(),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
timeout);
SlotRequestId requestId1 = new SlotRequestId();
CompletableFuture<PhysicalSlot> future1 = requestNewAllocatedSlot(
slotPool,
requestId1
);
assertFalse(future1.isDone());

final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
Expand All @@ -243,27 +227,22 @@ public void testAllocateWithFreeSlot() throws Exception {

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));

LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());

// return this slot to pool
slot1.releaseSlot();
slotPool.releaseSlot(requestId1, null);

CompletableFuture<LogicalSlot> future2 = scheduler.allocateSlot(
new SlotRequestId(),
new DummyScheduledUnit(),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
timeout);
CompletableFuture<PhysicalSlot> future2 = requestNewAllocatedSlot(
slotPool,
new SlotRequestId()
);

// second allocation fulfilled by previous slot returning
LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
PhysicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
assertTrue(future2.isDone());

assertNotEquals(slot1, slot2);
assertFalse(slot1.isAlive());
assertTrue(slot2.isAlive());
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
assertEquals(slot1, slot2);
}
}

Expand Down Expand Up @@ -679,7 +658,7 @@ public void testCreateAllocatedSlotReport() throws Exception {

final ArrayBlockingQueue<AllocationID> allocationIds = new ArrayBlockingQueue<>(1);
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));
slotRequest -> allocationIds.offer(slotRequest.getAllocationId()));

setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
Expand Down Expand Up @@ -826,8 +805,8 @@ private void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequest
}

private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId) {
final SlotPool slotPool,
final SlotRequestId slotRequestId) {
return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout);
}

Expand Down Expand Up @@ -923,10 +902,4 @@ private static Scheduler setupScheduler(
scheduler.start(mainThreadExecutable);
return scheduler;
}

private class TestSlotOwner implements SlotOwner {
@Override
public void returnLogicalSlot(LogicalSlot logicalSlot) {
}
}
}

0 comments on commit e9db5e6

Please sign in to comment.