From 923d1b4309c10a86cfa8ea3c385ff751c59e29a4 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 16 Feb 2015 21:40:06 +0100 Subject: [PATCH] Add autoparallelism to jobs --- .../client/program/AutoParallelismITCase.java | 118 ++++++++++++++++++ .../flink/api/common/ExecutionConfig.java | 6 + .../flink/runtime/jobmanager/JobManager.java | 7 ++ 3 files changed, 131 insertions(+) create mode 100644 flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java new file mode 100644 index 0000000000000..c1fa8884779dc --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.io.GenericInputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.minicluster.NepheleMiniCluster; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * This test verifies that the auto parallelism is properly forwarded to the runtime. + */ +public class AutoParallelismITCase { + + private static final int NUM_TM = 2; + private static final int SLOTS_PER_TM = 7; + private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM; + + @Test + public void testProgramWithAutoParallelism() { + + NepheleMiniCluster cluster = new NepheleMiniCluster(); + cluster.setNumTaskManager(NUM_TM); + cluster.setTaskManagerNumSlots(SLOTS_PER_TM); + + try { + cluster.start(); + + ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort()); + env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX); + + DataSet result = env + .createInput(new ParallelismDependentInputFormat()) + .mapPartition(new ParallelismDependentMapPartition()); + + List resultCollection = new ArrayList(); + result.output(new LocalCollectionOutputFormat(resultCollection)); + + env.execute(); + + assertEquals(PARALLELISM, resultCollection.size()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + try { + cluster.stop(); + } + catch (Throwable t) { + // ignore exceptions on shutdown + } + } + } + + private static class ParallelismDependentInputFormat extends GenericInputFormat { + + private transient boolean emitted; + + @Override + public GenericInputSplit[] createInputSplits(int numSplits) throws IOException { + assertEquals(PARALLELISM, numSplits); + return super.createInputSplits(numSplits); + } + + @Override + public boolean reachedEnd() { + return emitted; + } + + @Override + public Integer nextRecord(Integer reuse) { + if (emitted) { + return null; + } + emitted = true; + return 1; + } + } + + private static class ParallelismDependentMapPartition extends RichMapPartitionFunction { + + @Override + public void mapPartition(Iterable values, Collector out) { + out.collect(getRuntimeContext().getIndexOfThisSubtask()); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 03d5e3a13fcc2..8216b25802811 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -31,6 +31,12 @@ public class ExecutionConfig implements Serializable { // Key for storing it in the Job Configuration public static final String CONFIG_KEY = "runtime.config"; + /** + * The constant to use for the degree of parallelism, if the system should use the number + * of currently available slots. + */ + public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE; + private boolean useClosureCleaner = true; private int degreeOfParallelism = -1; private int numberOfExecutionRetries = -1; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java index 4f61d943ea4fb..223c6c67ca978 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java @@ -40,6 +40,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -374,6 +375,8 @@ public JobSubmissionResult submitJob(JobGraph job) throws IOException { LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName())); } + final int numSlots = scheduler.getTotalNumberOfSlots(); + for (AbstractJobVertex vertex : job.getVertices()) { // check that the vertex has an executable class String executableClass = vertex.getInvokableClassName(); @@ -383,6 +386,10 @@ public JobSubmissionResult submitJob(JobGraph job) throws IOException { // master side initialization vertex.initializeOnMaster(userCodeLoader); + + if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { + vertex.setParallelism(numSlots); + } } // first topologically sort the job vertices to form the basis of creating the execution graph