Skip to content

Commit

Permalink
[TEST] Try to remove the SchedulerImpl in testAllocateSimpleSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing committed Jul 17, 2020
1 parent 9fa0a99 commit 6031423
Showing 1 changed file with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,9 @@
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.*;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
Expand Down Expand Up @@ -123,17 +120,11 @@ public void testAllocateSimpleSlot() throws Exception {
CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
resourceManagerGateway.setRequestSlotConsumer(slotRequestFuture::complete);

try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
setupSlotPool(slotPool, resourceManagerGateway, mainThreadExecutor);
Scheduler scheduler = setupScheduler(slotPool, mainThreadExecutor);
try (SlotPoolImpl slotPool = createAndSetUpSlotPool()) {
slotPool.registerTaskManager(taskManagerLocation.getResourceID());

SlotRequestId requestId = new SlotRequestId();
CompletableFuture<LogicalSlot> future = scheduler.allocateSlot(
requestId,
new DummyScheduledUnit(),
SlotProfile.noLocality(DEFAULT_TESTING_PROFILE),
timeout);
CompletableFuture<PhysicalSlot> future = requestNewAllocatedSlot(slotPool, requestId);
assertFalse(future.isDone());

final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
Expand All @@ -145,10 +136,17 @@ public void testAllocateSimpleSlot() throws Exception {

assertTrue(slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer));

LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
PhysicalSlot physicalSlot = future.get(1, TimeUnit.SECONDS);
LogicalSlot logicalSlot = SingleLogicalSlot.allocateFromPhysicalSlot(
requestId,
physicalSlot,
Locality.UNKNOWN,
new TestSlotOwner(),
true
);
assertTrue(future.isDone());
assertTrue(slot.isAlive());
assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
assertTrue(logicalSlot.isAlive());
assertEquals(taskManagerLocation, logicalSlot.getTaskManagerLocation());
}
}

Expand Down Expand Up @@ -925,4 +923,10 @@ private static Scheduler setupScheduler(
scheduler.start(mainThreadExecutable);
return scheduler;
}

private class TestSlotOwner implements SlotOwner {
@Override
public void returnLogicalSlot(LogicalSlot logicalSlot) {
}
}
}

0 comments on commit 6031423

Please sign in to comment.