From 3a528934242821e9233575a843673e020c060dda Mon Sep 17 00:00:00 2001 From: Thesharing Date: Wed, 22 Jul 2020 17:49:35 +0800 Subject: [PATCH] Refactor the test cases with SlotPoolTestBase --- .../SlotPoolBatchSlotRequestTest.java | 30 ++++------- .../jobmaster/slotpool/SlotPoolImplTest.java | 29 ++--------- .../SlotPoolPendingRequestFailureTest.java | 26 +--------- .../SlotPoolRequestCompletionTest.java | 22 +------- .../jobmaster/slotpool/SlotPoolTestBase.java | 51 +++++++++++++++++++ 5 files changed, 67 insertions(+), 91 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestBase.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java index 2f4089a5a8441..af7b00d80acb4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.java @@ -22,14 +22,11 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.clock.ManualClock; import org.junit.Before; @@ -51,21 +48,16 @@ /** * Tests for batch slot requests. */ -public class SlotPoolBatchSlotRequestTest extends TestLogger { - - private static final ComponentMainThreadExecutor mainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forMainThread(); +public class SlotPoolBatchSlotRequestTest extends SlotPoolTestBase { private static final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1024); private static final ResourceProfile smallerResourceProfile = ResourceProfile.fromResources(0.5, 512); public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0]; - private SlotPoolBuilder slotPoolBuilder; - private TestingResourceManagerGateway testingResourceManagerGateway; - @Before + @Override public void setup() throws Exception { - testingResourceManagerGateway = new TestingResourceManagerGateway(); + resourceManagerGateway = new TestingResourceManagerGateway(); slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor); } @@ -133,13 +125,13 @@ public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th @Test public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exception { final CompletableFuture allocationIdFuture = new CompletableFuture<>(); - testingResourceManagerGateway.setRequestSlotConsumer( + resourceManagerGateway.setRequestSlotConsumer( slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); final Time batchSlotTimeout = Time.milliseconds(1000L); try (final SlotPoolImpl slotPool = slotPoolBuilder .setBatchSlotTimeout(batchSlotTimeout) - .setResourceManagerGateway(testingResourceManagerGateway) + .setResourceManagerGateway(resourceManagerGateway) .build()) { final CompletableFuture slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot( @@ -164,11 +156,11 @@ public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exc @Test public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception { final CompletableFuture allocationIdFuture = new CompletableFuture<>(); - testingResourceManagerGateway.setRequestSlotConsumer( + resourceManagerGateway.setRequestSlotConsumer( slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); try (final SlotPoolImpl slotPool = slotPoolBuilder - .setResourceManagerGateway(testingResourceManagerGateway) + .setResourceManagerGateway(resourceManagerGateway) .build()) { final CompletableFuture slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot( @@ -191,12 +183,12 @@ public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() thr */ @Test public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exception { - testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(new FlinkException("Failed request"))); + resourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(new FlinkException("Failed request"))); final Time batchSlotTimeout = Time.milliseconds(1000L); try (final SlotPoolImpl slotPool = slotPoolBuilder .setBatchSlotTimeout(batchSlotTimeout) - .setResourceManagerGateway(testingResourceManagerGateway) + .setResourceManagerGateway(resourceManagerGateway) .build()) { final CompletableFuture slotFuture = @@ -211,11 +203,11 @@ public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exce */ @Test public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws Exception { - testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally( + resourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally( new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN))); try (final SlotPoolImpl slotPool = slotPoolBuilder - .setResourceManagerGateway(testingResourceManagerGateway) + .setResourceManagerGateway(resourceManagerGateway) .build()) { final CompletableFuture slotFuture = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java index b7c3ea38b03ad..e64ee368d7b78 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java @@ -36,14 +36,12 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.clock.ManualClock; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; @@ -86,7 +84,7 @@ /** * Tests for the {@link SlotPoolImpl}. */ -public class SlotPoolImplTest extends TestLogger { +public class SlotPoolImplTest extends SlotPoolTestBase { private static final Time TIMEOUT = Time.seconds(10L); private static final ComponentMainThreadExecutor mainThreadExecutor = @@ -94,15 +92,13 @@ public class SlotPoolImplTest extends TestLogger { private TaskManagerLocation taskManagerLocation; private SimpleAckingTaskManagerGateway taskManagerGateway; - private TestingResourceManagerGateway resourceManagerGateway; - private SlotPoolBuilder slotPoolBuilder; @Before + @Override public void setup() throws Exception { taskManagerLocation = new LocalTaskManagerLocation(); taskManagerGateway = new SimpleAckingTaskManagerGateway(); - resourceManagerGateway = new TestingResourceManagerGateway(); - slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway); + super.setup(); } @Test @@ -781,25 +777,6 @@ public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception { } } - private void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) { - for (SlotRequestId slotRequestId : slotRequestIds) { - requestNewAllocatedSlot(slotPool, slotRequestId); - } - } - - private CompletableFuture requestNewAllocatedSlot( - final SlotPool slotPool, - final SlotRequestId slotRequestId) { - return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT); - } - - private CompletableFuture requestNewAllocatedSlot( - final SlotPool slotPool, - final SlotRequestId slotRequestId, - final Time timeout) { - return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout); - } - private void offerSlot(final SlotPoolImpl slotPool, final AllocationID allocationId) { final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY); slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java index 127de937817cd..13ca7c453b6cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java @@ -27,16 +27,13 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.ThrowingRunnable; -import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -59,20 +56,7 @@ /** * Tests for the failing of pending slot requests at the {@link SlotPool}. */ -public class SlotPoolPendingRequestFailureTest extends TestLogger { - - public static final Time TIMEOUT = Time.seconds(10L); - private static final ComponentMainThreadExecutor mainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forMainThread(); - - private TestingResourceManagerGateway resourceManagerGateway; - private SlotPoolBuilder slotPoolBuilder; - - @Before - public void setup() throws Exception { - resourceManagerGateway = new TestingResourceManagerGateway(); - slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway); - } +public class SlotPoolPendingRequestFailureTest extends SlotPoolTestBase { /** * Tests that failing an allocation fails the pending slot request. @@ -202,12 +186,4 @@ public void testPendingSlotRequestTimeout() throws Exception { singleThreadExecutor.shutdownNow(); } } - - private CompletableFuture requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId) { - return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT); - } - - private CompletableFuture requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId, Time timeout) { - return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java index f41a5905d77ed..364cc235addd2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java @@ -19,23 +19,17 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.CheckedSupplier; -import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -57,21 +51,7 @@ /** * Tests how the {@link SlotPoolImpl} completes slot requests. */ -public class SlotPoolRequestCompletionTest extends TestLogger { - - private static final Time TIMEOUT = Time.seconds(10L); - private static final ComponentMainThreadExecutor mainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forMainThread(); - - - private TestingResourceManagerGateway resourceManagerGateway; - private SlotPoolBuilder slotPoolBuilder; - - @Before - public void setUp() throws Exception { - resourceManagerGateway = new TestingResourceManagerGateway(); - slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor); - } +public class SlotPoolRequestCompletionTest extends SlotPoolTestBase { /** * Tests that the {@link SlotPoolImpl} completes slots in request order. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestBase.java new file mode 100644 index 0000000000000..2da0c17333d8b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestBase.java @@ -0,0 +1,51 @@ +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; + +import java.util.concurrent.CompletableFuture; + +/** + * Test base for {@link SlotPool} related test cases. + */ +public abstract class SlotPoolTestBase extends TestLogger { + protected static final Time TIMEOUT = Time.seconds(10L); + + protected final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forMainThread(); + + protected TestingResourceManagerGateway resourceManagerGateway; + protected SlotPoolBuilder slotPoolBuilder; + + @Before + public void setup() throws Exception { + resourceManagerGateway = new TestingResourceManagerGateway(); + slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway); + } + + protected void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) { + for (SlotRequestId slotRequestId : slotRequestIds) { + requestNewAllocatedSlot(slotPool, slotRequestId); + } + } + + protected CompletableFuture requestNewAllocatedSlot( + final SlotPool slotPool, + final SlotRequestId slotRequestId) { + return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT); + } + + protected CompletableFuture requestNewAllocatedSlot( + final SlotPool slotPool, + final SlotRequestId slotRequestId, + final Time timeout) { + return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout); + } +}