Skip to content

Commit

Permalink
[fix-up][tests] Refactor SlotPoolImplTest with SlotPoolTestBase
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing committed Jul 30, 2020
1 parent 084c389 commit da7fb96
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 48 deletions.
Expand Up @@ -91,9 +91,9 @@ public class SlotPoolImplTest extends SlotPoolTestBase {
@Before
@Override
public void setup() throws Exception {
super.setup();
taskManagerLocation = new LocalTaskManagerLocation();
taskManagerGateway = new SimpleAckingTaskManagerGateway();
super.setup();
}

@Test
Expand All @@ -102,20 +102,14 @@ public void testAllocateSimpleSlot() throws Exception {
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

SlotRequestId requestId = new SlotRequestId();
final CompletableFuture<PhysicalSlot> future = requestNewAllocatedSlot(slotPool, requestId);
assertFalse(future.isDone());

final SlotRequest slotRequest = slotRequestFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);

final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, slotRequest.getAllocationId()));

final PhysicalSlot physicalSlot = future.get(1, TimeUnit.SECONDS);
assertTrue(future.isDone());
Expand All @@ -135,7 +129,6 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception {
});

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

SlotRequestId requestId1 = new SlotRequestId();
CompletableFuture<PhysicalSlot> future1 = requestNewAllocatedSlot(
Expand All @@ -155,12 +148,7 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception {
slotRequests.add(slotRequestQueue.poll(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
}

final SlotOffer slotOffer = new SlotOffer(
slotRequests.get(0).getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, slotRequests.get(0).getAllocationId()));

PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
Expand All @@ -183,15 +171,9 @@ public void testAllocateWithFreeSlot() throws Exception {
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

AllocationID allocationID = new AllocationID();
final SlotOffer slotOffer = new SlotOffer(
allocationID,
0,
DEFAULT_TESTING_PROFILE);

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, allocationID));

assertEquals(1, slotPool.getAvailableSlots().size());
assertEquals(0, slotPool.getAllocatedSlots().size());
Expand All @@ -213,7 +195,7 @@ public void testOfferSlot() throws Exception {
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
registerTaskManager(slotPool, taskManagerLocation);

SlotRequestId requestId = new SlotRequestId();
CompletableFuture<PhysicalSlot> future = requestNewAllocatedSlot(
Expand Down Expand Up @@ -278,7 +260,6 @@ public void testReleaseResource() throws Exception {
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

SlotRequestId requestId1 = new SlotRequestId();
CompletableFuture<PhysicalSlot> future1 = requestNewAllocatedSlot(
Expand All @@ -291,12 +272,7 @@ public void testReleaseResource() throws Exception {
slotPool,
new SlotRequestId());

final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
0,
DEFAULT_TESTING_PROFILE);

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, slotRequest.getAllocationId()));

final PhysicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
Expand Down Expand Up @@ -371,11 +347,7 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception

assertEquals(allocationId1, canceledAllocations.take());

final SlotOffer slotOffer = new SlotOffer(allocationId1, 0, ResourceProfile.ANY);

slotPool.registerTaskManager(taskManagerLocation.getResourceID());

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));
assertTrue(registerAndOfferSlot(taskManagerLocation, slotPool, allocationId1));

// the slot offer should fulfill the second slot request
assertEquals(allocationId1, slotFuture2.get().getAllocationId());
Expand All @@ -392,7 +364,7 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception
public void testShutdownReleasesAllSlots() throws Exception {

try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
registerTaskManager(slotPool, taskManagerLocation);

final int numSlotOffers = 2;

Expand Down Expand Up @@ -457,7 +429,7 @@ public void testCheckIdleSlot() throws Exception {
final SlotOffer slotToExpire = new SlotOffer(expiredSlotID, 0, ResourceProfile.ANY);
final SlotOffer slotToNotExpire = new SlotOffer(freshSlotID, 1, ResourceProfile.ANY);

assertThat(slotPool.registerTaskManager(taskManagerLocation.getResourceID()),
assertThat(registerTaskManager(slotPool, taskManagerLocation),
Matchers.is(true));

assertThat(
Expand Down Expand Up @@ -497,7 +469,7 @@ public void testDiscardIdleSlotIfReleasingFailed() throws Exception {
return FutureUtils.completedExceptionally(new TimeoutException("Test failure"));
});

assertThat(slotPool.registerTaskManager(taskManagerLocation.getResourceID()), Matchers.is(true));
assertThat(registerTaskManager(slotPool, taskManagerLocation), Matchers.is(true));

assertThat(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotToExpire), Matchers.is(true));

Expand Down Expand Up @@ -547,7 +519,7 @@ public void testFreeFailedSlots() throws Exception {
slotOffers.add(new SlotOffer(allocationIds.take(), i, ResourceProfile.ANY));
}

slotPool.registerTaskManager(taskManagerLocation.getResourceID());
registerTaskManager(slotPool, taskManagerLocation);
slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

// wait for the completion of both slot futures
Expand Down Expand Up @@ -610,7 +582,7 @@ public void testCreateAllocatedSlotReport() throws Exception {
slotOffers.add(new SlotOffer(availableId, 1, ResourceProfile.ANY));
allocatedSlotInfos.add(new AllocatedSlotInfo(1, availableId));

slotPool.registerTaskManager(taskManagerLocation.getResourceID());
registerTaskManager(slotPool, taskManagerLocation);
slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);

// wait for the completion of slot future
Expand Down Expand Up @@ -687,7 +659,6 @@ public void testOrphanedAllocationIsCanceledIfNotRemapped() throws Exception {
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

try (SlotPoolImpl slotPool = createAndSetUpSlotPool(resourceManagerGateway)) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

final SlotRequestId slotRequestId1 = new SlotRequestId();
final SlotRequestId slotRequestId2 = new SlotRequestId();
Expand Down Expand Up @@ -737,15 +708,15 @@ private TestingSlotPoolImpl createAndSetUpSlotPool(Clock clock, Time idleSlotTim
return slotPoolBuilder.setClock(clock).setIdleSlotTimeout(idleSlotTimeout).build();
}

private void registerAndOfferSlot(
private boolean registerAndOfferSlot(
final TaskManagerLocation taskManagerLocation,
final SlotPoolImpl slotPool,
final AllocationID allocationId) {

registerTaskManager(slotPool, taskManagerLocation);

final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer);
return slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer);
}

private List<AllocationID> registerAndOfferSlots(
Expand Down
Expand Up @@ -33,22 +33,22 @@ public void setup() throws Exception {
slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
}

protected TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
TestingSlotPoolImpl createAndSetUpSlotPool() throws Exception {
return slotPoolBuilder.build();
}

protected TestingSlotPoolImpl createAndSetUpSlotPool(final JobID jobId) throws Exception {
TestingSlotPoolImpl createAndSetUpSlotPool(final JobID jobId) throws Exception {
return slotPoolBuilder.setJobId(jobId).build();
}

protected TestingSlotPoolImpl createAndSetUpSlotPool(
TestingSlotPoolImpl createAndSetUpSlotPool(
final ResourceManagerGateway resourceManagerGateway) throws Exception {

return slotPoolBuilder.setResourceManagerGateway(resourceManagerGateway).build();
}

static void registerTaskManager(final SlotPool slotPool, final TaskManagerLocation taskManagerLocation) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());
static boolean registerTaskManager(final SlotPool slotPool, final TaskManagerLocation taskManagerLocation) {
return slotPool.registerTaskManager(taskManagerLocation.getResourceID());
}

static void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) {
Expand Down

0 comments on commit da7fb96

Please sign in to comment.