Skip to content

Commit

Permalink
[FLINK-16300][tests] Introduce ExecutionGraphTestUtils#getExecutions(…
Browse files Browse the repository at this point in the history
……) to replace SchedulerTestUtils#getTestVertex(…)
  • Loading branch information
zhuzhurk committed Mar 4, 2020
1 parent f7c53cb commit 24a1eb4
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
Expand All @@ -45,7 +47,10 @@

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
Expand All @@ -60,6 +65,8 @@
*/
public class ExecutionGraphTestUtils {

private static final Time DEFAULT_TIMEOUT = AkkaUtils.getDefaultTimeout();

// ------------------------------------------------------------------------
// reaching states
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -409,8 +416,22 @@ public static ExecutionJobVertex getExecutionJobVertex(
ScheduledExecutorService executor,
ScheduleMode scheduleMode) throws Exception {

return getExecutionJobVertex(id, 1, null, executor, scheduleMode);
}

public static ExecutionJobVertex getExecutionJobVertex(
JobVertexID id,
int parallelism,
@Nullable SlotSharingGroup slotSharingGroup,
ScheduledExecutorService executor,
ScheduleMode scheduleMode) throws Exception {

JobVertex ajv = new JobVertex("TestVertex", id);
ajv.setInvokableClass(AbstractInvokable.class);
ajv.setParallelism(parallelism);
if (slotSharingGroup != null) {
ajv.setSlotSharingGroup(slotSharingGroup);
}

JobGraph jobGraph = new JobGraph(ajv);
jobGraph.setScheduleMode(scheduleMode);
Expand All @@ -431,6 +452,69 @@ public static ExecutionJobVertex getExecutionJobVertex(JobVertexID id) throws Ex
return getExecutionJobVertex(id, new DirectScheduledExecutorService());
}

public static Execution getExecution() throws Exception {
final ExecutionJobVertex ejv = getExecutionJobVertex(new JobVertexID());
return ejv.getTaskVertices()[0].getCurrentExecutionAttempt();
}

public static Execution getExecution(final TaskManagerLocation... preferredLocations) throws Exception {
return getExecution(mapToPreferredLocationFutures(preferredLocations));
}

private static Collection<CompletableFuture<TaskManagerLocation>> mapToPreferredLocationFutures(
final TaskManagerLocation... preferredLocations) {

final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>();
for (TaskManagerLocation preferredLocation : preferredLocations) {
preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation));
}
return preferredLocationFutures;
}

public static Execution getExecution(
final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) throws Exception {

final ExecutionJobVertex ejv = getExecutionJobVertex(new JobVertexID());
final TestExecutionVertex ev = new TestExecutionVertex(ejv, 0, new IntermediateResult[0], DEFAULT_TIMEOUT);
ev.setPreferredLocationFutures(preferredLocationFutures);
return ev.getCurrentExecutionAttempt();
}

public static Execution getExecution(
final JobVertexID jid,
final int subtaskIndex,
final int numTasks,
final SlotSharingGroup slotSharingGroup) throws Exception {

return getExecution(jid, subtaskIndex, numTasks, slotSharingGroup, null);
}

public static Execution getExecution(
final JobVertexID jid,
final int subtaskIndex,
final int numTasks,
final SlotSharingGroup slotSharingGroup,
@Nullable final TaskManagerLocation... locations) throws Exception {

final ExecutionJobVertex ejv = getExecutionJobVertex(
jid,
numTasks,
slotSharingGroup,
new DirectScheduledExecutorService(),
ScheduleMode.LAZY_FROM_SOURCES);
final TestExecutionVertex ev = new TestExecutionVertex(
ejv,
subtaskIndex,
new IntermediateResult[0],
DEFAULT_TIMEOUT);

if (locations != null) {
ev.setPreferredLocationFutures(mapToPreferredLocationFutures(locations));
}

return ev.getCurrentExecutionAttempt();
}

// ------------------------------------------------------------------------
// graph vertex verifications
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
Expand Down Expand Up @@ -61,6 +60,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecution;
import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand Down Expand Up @@ -291,7 +291,7 @@ public void testSlotAllocationCancellationWhenExecutionCancelled() throws Except
* Tests that all preferred locations are calculated.
*/
@Test
public void testAllPreferredLocationCalculation() throws ExecutionException, InterruptedException {
public void testAllPreferredLocationCalculation() throws Exception {
final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();
Expand All @@ -300,7 +300,7 @@ public void testAllPreferredLocationCalculation() throws ExecutionException, Int
final CompletableFuture<TaskManagerLocation> locationFuture2 = new CompletableFuture<>();
final CompletableFuture<TaskManagerLocation> locationFuture3 = new CompletableFuture<>();

final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
final Execution execution = getExecution(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));

CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ALL);

Expand All @@ -323,15 +323,15 @@ public void testAllPreferredLocationCalculation() throws ExecutionException, Int
* Tests that any preferred locations are calculated.
*/
@Test
public void testAnyPreferredLocationCalculation() throws ExecutionException, InterruptedException {
public void testAnyPreferredLocationCalculation() throws Exception {
final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();

final CompletableFuture<TaskManagerLocation> locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1);
final CompletableFuture<TaskManagerLocation> locationFuture2 = new CompletableFuture<>();
final CompletableFuture<TaskManagerLocation> locationFuture3 = CompletableFuture.completedFuture(taskManagerLocation3);

final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
final Execution execution = getExecution(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));

CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ANY);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* An {@link ExecutionVertex} implementation for testing.
*/
public class TestExecutionVertex extends ExecutionVertex {

private Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures;

TestExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout) {

super(
jobVertex,
subTaskIndex,
producedDataSets,
timeout);
}

public void setPreferredLocationFutures(final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures) {
this.preferredLocationFutures = preferredLocationFutures;
}

@Override
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
return preferredLocationFutures != null ? preferredLocationFutures : super.getPreferredLocations();
}
}

0 comments on commit 24a1eb4

Please sign in to comment.