Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.flink.util.FlinkException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.function.Predicate;

Expand All @@ -47,15 +45,10 @@
* Additional {@link ExecutionGraph} restart tests {@link ExecutionGraphRestartTest} which
* require the usage of a {@link SlotProvider}.
*/
@RunWith(Parameterized.class)
public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {

private static final int NUM_TASKS = 31;

public ExecutionGraphCoLocationRestartTest(SchedulerType schedulerType) {
super(schedulerType);
}

@Test
public void testConstraintsAfterRestart() throws Exception {
final long timeout = 5000L;
Expand All @@ -79,10 +72,10 @@ public void testConstraintsAfterRestart() throws Exception {
groupVertex,
groupVertex2);

if (schedulerType == SchedulerType.SLOT_POOL) {
// enable the queued scheduling for the slot pool
eg.setQueuedSchedulingAllowed(true);
}

// enable the queued scheduling for the slot pool
eg.setQueuedSchedulingAllowed(true);


assertEquals(JobStatus.CREATED, eg.getState());

Expand Down Expand Up @@ -122,7 +115,7 @@ private void validateConstraints(ExecutionGraph eg) {

ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);

for(int i = 0; i < NUM_TASKS; i++){
for (int i = 0; i < NUM_TASKS; i++) {
CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint();
CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint();
assertThat(constr1.isAssigned(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collections;
import java.util.concurrent.ExecutionException;
Expand All @@ -40,13 +38,8 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(Parameterized.class)
public class ScheduleWithCoLocationHintTest extends SchedulerTestBase {

public ScheduleWithCoLocationHintTest(SchedulerType schedulerType) {
super(schedulerType);
}

@Test
public void scheduleAllSharedAndCoLocated() throws Exception {
JobVertexID jid1 = new JobVertexID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -53,12 +51,7 @@
/**
* Tests for the {@link Scheduler} when scheduling individual tasks.
*/
@RunWith(Parameterized.class)
public class SchedulerIsolatedTasksTest extends SchedulerTestBase {

public SchedulerIsolatedTasksTest(SchedulerType schedulerType) {
super(schedulerType);
}

@Test
public void testScheduleImmediately() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collections;
import java.util.Random;
Expand All @@ -52,13 +50,8 @@
/**
* Tests for the scheduler when scheduling tasks in slot sharing groups.
*/
@RunWith(Parameterized.class)
public class SchedulerSlotSharingTest extends SchedulerTestBase {

public SchedulerSlotSharingTest(SchedulerType schedulerType) {
super(schedulerType);
}

@Test
public void scheduleSingleVertexType() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
Expand All @@ -52,20 +51,14 @@

import org.junit.After;
import org.junit.Before;
import org.junit.runners.Parameterized;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SCHEDULER;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SLOT_POOL;
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;

/**
* Test base for scheduler related test cases. The test are
* executed with the {@link Scheduler} and the {@link SlotPool}.
Expand All @@ -74,49 +67,21 @@ public class SchedulerTestBase extends TestLogger {

protected TestingSlotProvider testingSlotProvider;

protected SchedulerType schedulerType;

private RpcService rpcService;

public enum SchedulerType {
SCHEDULER,
SLOT_POOL
}

@Parameterized.Parameters(name = "Scheduler type = {0}")
public static Collection<Object[]> schedulerTypes() {
return Arrays.asList(
new Object[]{SCHEDULER},
new Object[]{SLOT_POOL});
}

protected SchedulerTestBase(SchedulerType schedulerType) {
this.schedulerType = Preconditions.checkNotNull(schedulerType);
rpcService = null;
}

@Before
public void setup() throws Exception {
switch (schedulerType) {
case SCHEDULER:
testingSlotProvider = new TestingSchedulerSlotProvider(
new Scheduler(
TestingUtils.defaultExecutionContext()));
break;
case SLOT_POOL:
rpcService = new TestingRpcService();
final JobID jobId = new JobID();
final TestingSlotPool slotPool = new TestingSlotPool(
rpcService,
jobId,
LocationPreferenceSchedulingStrategy.getInstance());
testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);

final JobMasterId jobMasterId = JobMasterId.generate();
final String jobManagerAddress = "localhost";
slotPool.start(jobMasterId, jobManagerAddress);
break;
}
rpcService = new TestingRpcService();
final JobID jobId = new JobID();
final TestingSlotPool slotPool = new TestingSlotPool(
rpcService,
jobId,
LocationPreferenceSchedulingStrategy.getInstance());
testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);

final JobMasterId jobMasterId = JobMasterId.generate();
final String jobManagerAddress = "localhost";
slotPool.start(jobMasterId, jobManagerAddress);
}

@After
Expand Down Expand Up @@ -154,86 +119,6 @@ protected interface TestingSlotProvider extends SlotProvider {
void shutdown() throws Exception;
}

private static final class TestingSchedulerSlotProvider implements TestingSlotProvider {
private final Scheduler scheduler;

private TestingSchedulerSlotProvider(Scheduler scheduler) {
this.scheduler = Preconditions.checkNotNull(scheduler);
}

@Override
public CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit task,
boolean allowQueued,
SlotProfile slotProfile,
Time allocationTimeout) {
return scheduler.allocateSlot(task, allowQueued, slotProfile, allocationTimeout);
}

@Override
public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
return CompletableFuture.completedFuture(Acknowledge.get());
}

@Override
public TaskManagerLocation addTaskManager(int numberSlots) {
final Instance instance = getRandomInstance(numberSlots);
scheduler.newInstanceAvailable(instance);

return instance.getTaskManagerLocation();
}

@Override
public void releaseTaskManager(ResourceID resourceId) {
final Instance instance = scheduler.getInstance(resourceId);

if (instance != null) {
scheduler.instanceDied(instance);
}
}

@Override
public int getNumberOfAvailableSlots() {
return scheduler.getNumberOfAvailableSlots();
}

@Override
public int getNumberOfLocalizedAssignments() {
return scheduler.getNumberOfLocalizedAssignments();
}

@Override
public int getNumberOfNonLocalizedAssignments() {
return scheduler.getNumberOfNonLocalizedAssignments();
}

@Override
public int getNumberOfUnconstrainedAssignments() {
return scheduler.getNumberOfUnconstrainedAssignments();
}

@Override
public int getNumberOfHostLocalizedAssignments() {
return 0;
}

@Override
public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
return slotSharingGroup.getTaskAssignment().getNumberOfSlots();
}

@Override
public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
return slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexId);
}

@Override
public void shutdown() {
scheduler.shutdown();
}
}

private static final class TestingSlotPoolSlotProvider implements TestingSlotProvider {

private final TestingSlotPool slotPool;
Expand Down