Skip to content

Commit

Permalink
Refactor the test cases with SlotPoolTestBase
Browse files Browse the repository at this point in the history
  • Loading branch information
Thesharing committed Jul 22, 2020
1 parent 3e263c1 commit 3a52893
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 91 deletions.
Expand Up @@ -22,14 +22,11 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.ManualClock;

import org.junit.Before;
Expand All @@ -51,21 +48,16 @@
/**
* Tests for batch slot requests.
*/
public class SlotPoolBatchSlotRequestTest extends TestLogger {

private static final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();
public class SlotPoolBatchSlotRequestTest extends SlotPoolTestBase {

private static final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1024);
private static final ResourceProfile smallerResourceProfile = ResourceProfile.fromResources(0.5, 512);
public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0];

private SlotPoolBuilder slotPoolBuilder;
private TestingResourceManagerGateway testingResourceManagerGateway;

@Before
@Override
public void setup() throws Exception {
testingResourceManagerGateway = new TestingResourceManagerGateway();
resourceManagerGateway = new TestingResourceManagerGateway();
slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor);
}

Expand Down Expand Up @@ -133,13 +125,13 @@ public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() th
@Test
public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exception {
final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
testingResourceManagerGateway.setRequestSlotConsumer(
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));

final Time batchSlotTimeout = Time.milliseconds(1000L);
try (final SlotPoolImpl slotPool = slotPoolBuilder
.setBatchSlotTimeout(batchSlotTimeout)
.setResourceManagerGateway(testingResourceManagerGateway)
.setResourceManagerGateway(resourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(
Expand All @@ -164,11 +156,11 @@ public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exc
@Test
public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
testingResourceManagerGateway.setRequestSlotConsumer(
resourceManagerGateway.setRequestSlotConsumer(
slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));

try (final SlotPoolImpl slotPool = slotPoolBuilder
.setResourceManagerGateway(testingResourceManagerGateway)
.setResourceManagerGateway(resourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot(
Expand All @@ -191,12 +183,12 @@ public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() thr
*/
@Test
public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exception {
testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(new FlinkException("Failed request")));
resourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(new FlinkException("Failed request")));

final Time batchSlotTimeout = Time.milliseconds(1000L);
try (final SlotPoolImpl slotPool = slotPoolBuilder
.setBatchSlotTimeout(batchSlotTimeout)
.setResourceManagerGateway(testingResourceManagerGateway)
.setResourceManagerGateway(resourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture =
Expand All @@ -211,11 +203,11 @@ public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exce
*/
@Test
public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws Exception {
testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(
resourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(
new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN)));

try (final SlotPoolImpl slotPool = slotPoolBuilder
.setResourceManagerGateway(testingResourceManagerGateway)
.setResourceManagerGateway(resourceManagerGateway)
.build()) {

final CompletableFuture<PhysicalSlot> slotFuture =
Expand Down
Expand Up @@ -36,14 +36,12 @@
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.ManualClock;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -86,23 +84,21 @@
/**
* Tests for the {@link SlotPoolImpl}.
*/
public class SlotPoolImplTest extends TestLogger {
public class SlotPoolImplTest extends SlotPoolTestBase {

private static final Time TIMEOUT = Time.seconds(10L);
private static final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();

private TaskManagerLocation taskManagerLocation;
private SimpleAckingTaskManagerGateway taskManagerGateway;
private TestingResourceManagerGateway resourceManagerGateway;
private SlotPoolBuilder slotPoolBuilder;

@Before
@Override
public void setup() throws Exception {
taskManagerLocation = new LocalTaskManagerLocation();
taskManagerGateway = new SimpleAckingTaskManagerGateway();
resourceManagerGateway = new TestingResourceManagerGateway();
slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
super.setup();
}

@Test
Expand Down Expand Up @@ -781,25 +777,6 @@ public void testSlotsOfferedWithoutResourceManagerConnected() throws Exception {
}
}

private void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) {
for (SlotRequestId slotRequestId : slotRequestIds) {
requestNewAllocatedSlot(slotPool, slotRequestId);
}
}

private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId) {
return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT);
}

private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId,
final Time timeout) {
return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout);
}

private void offerSlot(final SlotPoolImpl slotPool, final AllocationID allocationId) {
final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
slotPool.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer);
Expand Down
Expand Up @@ -27,16 +27,13 @@
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -59,20 +56,7 @@
/**
* Tests for the failing of pending slot requests at the {@link SlotPool}.
*/
public class SlotPoolPendingRequestFailureTest extends TestLogger {

public static final Time TIMEOUT = Time.seconds(10L);
private static final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();

private TestingResourceManagerGateway resourceManagerGateway;
private SlotPoolBuilder slotPoolBuilder;

@Before
public void setup() throws Exception {
resourceManagerGateway = new TestingResourceManagerGateway();
slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
}
public class SlotPoolPendingRequestFailureTest extends SlotPoolTestBase {

/**
* Tests that failing an allocation fails the pending slot request.
Expand Down Expand Up @@ -202,12 +186,4 @@ public void testPendingSlotRequestTimeout() throws Exception {
singleThreadExecutor.shutdownNow();
}
}

private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId) {
return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT);
}

private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(SlotPoolImpl slotPool, SlotRequestId slotRequestId, Time timeout) {
return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout);
}
}
Expand Up @@ -19,23 +19,17 @@
package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -57,21 +51,7 @@
/**
* Tests how the {@link SlotPoolImpl} completes slot requests.
*/
public class SlotPoolRequestCompletionTest extends TestLogger {

private static final Time TIMEOUT = Time.seconds(10L);
private static final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();


private TestingResourceManagerGateway resourceManagerGateway;
private SlotPoolBuilder slotPoolBuilder;

@Before
public void setUp() throws Exception {
resourceManagerGateway = new TestingResourceManagerGateway();
slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor);
}
public class SlotPoolRequestCompletionTest extends SlotPoolTestBase {

/**
* Tests that the {@link SlotPoolImpl} completes slots in request order.
Expand Down
@@ -0,0 +1,51 @@
package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.TestLogger;

import org.junit.Before;

import java.util.concurrent.CompletableFuture;

/**
* Test base for {@link SlotPool} related test cases.
*/
public abstract class SlotPoolTestBase extends TestLogger {
protected static final Time TIMEOUT = Time.seconds(10L);

protected final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forMainThread();

protected TestingResourceManagerGateway resourceManagerGateway;
protected SlotPoolBuilder slotPoolBuilder;

@Before
public void setup() throws Exception {
resourceManagerGateway = new TestingResourceManagerGateway();
slotPoolBuilder = new SlotPoolBuilder(mainThreadExecutor).setResourceManagerGateway(resourceManagerGateway);
}

protected void requestNewAllocatedSlots(final SlotPool slotPool, final SlotRequestId... slotRequestIds) {
for (SlotRequestId slotRequestId : slotRequestIds) {
requestNewAllocatedSlot(slotPool, slotRequestId);
}
}

protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId) {
return requestNewAllocatedSlot(slotPool, slotRequestId, TIMEOUT);
}

protected CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
final SlotPool slotPool,
final SlotRequestId slotRequestId,
final Time timeout) {
return slotPool.requestNewAllocatedSlot(slotRequestId, ResourceProfile.UNKNOWN, timeout);
}
}

0 comments on commit 3a52893

Please sign in to comment.