Skip to content

Commit

Permalink
[FLINK-1679] deprecate API methods to set the parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Mar 23, 2015
1 parent 35f3416 commit d994d2e
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 35 deletions.
Expand Up @@ -137,8 +137,25 @@ public boolean isClosureCleanerEnabled() {
* *
* @return The parallelism used by operations, unless they override that value. This method * @return The parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environment's default parallelism should be used. * returns {@code -1}, if the environment's default parallelism should be used.
* @deprecated Please use {@link #getParallelism}
*/ */
@Deprecated
public int getDegreeOfParallelism() { public int getDegreeOfParallelism() {
return getParallelism();
}

/**
* Gets the parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific parallelism.
*
* Other operations may need to run with a different parallelism - for example calling
* a reduce operation over the entire data set will involve an operation that runs
* with a parallelism of one (the final reduce to the single result value).
*
* @return The parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environment's default parallelism should be used.
*/
public int getParallelism() {
return parallelism; return parallelism;
} }


Expand All @@ -150,14 +167,32 @@ public int getDegreeOfParallelism() {
* This method overrides the default parallelism for this environment. * This method overrides the default parallelism for this environment.
* The local execution environment uses by default a value equal to the number of hardware * The local execution environment uses by default a value equal to the number of hardware
* contexts (CPU cores / threads). When executing the program via the command line client * contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default degree of parallelism is the one configured for that setup. * from a JAR file, the default parallelism is the one configured for that setup.
* *
* @param parallelism The parallelism to use * @param parallelism The parallelism to use
* @deprecated Please use {@link #setParallelism}
*/ */
@Deprecated
public ExecutionConfig setDegreeOfParallelism(int parallelism) { public ExecutionConfig setDegreeOfParallelism(int parallelism) {
return setParallelism(parallelism);
}

/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances.
* <p>
* This method overrides the default parallelism for this environment.
* The local execution environment uses by default a value equal to the number of hardware
* contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default parallelism is the one configured for that setup.
*
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
if (parallelism < 1 && parallelism != -1) { if (parallelism < 1 && parallelism != -1) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Degree of parallelism must be at least one, or -1 (use system default)."); "Parallelism must be at least one, or -1 (use system default).");
} }
this.parallelism = parallelism; this.parallelism = parallelism;
return this; return this;
Expand Down
Expand Up @@ -40,7 +40,7 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {


protected String name; // the name of the contract instance. optional. protected String name; // the name of the contract instance. optional.


private int degreeOfParallelism = -1; // the number of parallel instances to use. -1, if unknown private int parallelism = -1; // the number of parallel instances to use. -1, if unknown


/** /**
* The return type of the user function. * The return type of the user function.
Expand Down Expand Up @@ -160,25 +160,50 @@ public void setParameter(String key, boolean value) {
} }


/** /**
* Gets the degree of parallelism for this contract instance. The degree of parallelism denotes * Gets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. If this * how many parallel instances of the user function will be spawned during the execution. If this
* value is <code>-1</code>, then the system will decide the number of parallel instances by itself. * value is <code>-1</code>, then the system will decide the number of parallel instances by itself.
* *
* @return The degree of parallelism. * @return The parallelism.
* @deprecated Please use {@link #getParallelism}
*/ */
@Deprecated
public int getDegreeOfParallelism() { public int getDegreeOfParallelism() {
return this.degreeOfParallelism; return getParallelism();
} }


/** /**
* Sets the degree of parallelism for this contract instance. The degree of parallelism denotes * Gets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. If this
* value is <code>-1</code>, then the system will decide the number of parallel instances by itself.
*
* @return The parallelism.
*/
public int getParallelism() {
return this.parallelism;
}

/**
* Sets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. Set this * how many parallel instances of the user function will be spawned during the execution. Set this
* value to <code>-1</code> to let the system decide on its own. * value to <code>-1</code> to let the system decide on its own.
* *
* @param degree The number of parallel instances to spawn. -1, if unspecified. * @param parallelism The number of parallel instances to spawn. -1, if unspecified.
* @deprecated Please use {@link #setParallelism}
*/
@Deprecated
public void setDegreeOfParallelism(int parallelism) {
setParallelism(parallelism);
}
/**
* Sets the parallelism for this contract instance. The parallelism denotes
* how many parallel instances of the user function will be spawned during the execution. Set this
* value to <code>-1</code> to let the system decide on its own.
*
* @param parallelism The number of parallel instances to spawn. -1, if unspecified.
*/ */
public void setDegreeOfParallelism(int degree) { public void setParallelism(int parallelism) {
this.degreeOfParallelism = degree; this.parallelism = parallelism;
} }




Expand Down
Expand Up @@ -32,9 +32,18 @@ public JobExecutionResult execute(String jobName) throws Exception {
CollectionExecutor exec = new CollectionExecutor(getConfig()); CollectionExecutor exec = new CollectionExecutor(getConfig());
return exec.execute(p); return exec.execute(p);
} }


/**
* @deprecated Please use {@link #getParallelism}
*/
@Override @Override
@Deprecated
public int getDegreeOfParallelism() { public int getDegreeOfParallelism() {
return getParallelism();
}

@Override
public int getParallelism() {
return 1; // always serial return 1; // always serial
} }


Expand Down
Expand Up @@ -142,9 +142,26 @@ public ExecutionConfig getConfig() {
* *
* @return The degree of parallelism used by operations, unless they override that value. This method * @return The degree of parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environments default parallelism should be used. * returns {@code -1}, if the environments default parallelism should be used.
* @deprecated Please use {@link #getParallelism}
*/ */
@Deprecated
public int getDegreeOfParallelism() { public int getDegreeOfParallelism() {
return config.getDegreeOfParallelism(); return getParallelism();
}

/**
* Gets the parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific parallelism via
* {@link Operator#setParallelism(int)}. Other operations may need to run with a different
* parallelism - for example calling
* {@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire
* set will insert eventually an operation that runs non-parallel (parallelism of one).
*
* @return The parallelism used by operations, unless they override that value. This method
* returns {@code -1}, if the environments default parallelism should be used.
*/
public int getParallelism() {
return config.getParallelism();
} }


/** /**
Expand All @@ -157,10 +174,28 @@ public int getDegreeOfParallelism() {
* contexts (CPU cores / threads). When executing the program via the command line client * contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default degree of parallelism is the one configured for that setup. * from a JAR file, the default degree of parallelism is the one configured for that setup.
* *
* @param degreeOfParallelism The degree of parallelism * @param parallelism The parallelism
* @deprecated Please use {@link #setParallelism}
*/
@Deprecated
public void setDegreeOfParallelism(int parallelism) {
setParallelism(parallelism);
}

/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances.
* <p>
* This method overrides the default parallelism for this environment.
* The {@link LocalEnvironment} uses by default a value equal to the number of hardware
* contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default parallelism is the one configured for that setup.
*
* @param parallelism The parallelism
*/ */
public void setDegreeOfParallelism(int degreeOfParallelism) { public void setParallelism(int parallelism) {
config.setDegreeOfParallelism(degreeOfParallelism); config.setParallelism(parallelism);
} }


/** /**
Expand Down
Expand Up @@ -82,17 +82,35 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances. This value can be overridden by specific operations using * x parallel instances. This value can be overridden by specific operations using
* [[DataSet.setParallelism]]. * [[DataSet.setParallelism]].
* @deprecated Please use [[setParallelism]]
*/ */
def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = { @deprecated
javaEnv.setDegreeOfParallelism(degreeOfParallelism) def setDegreeOfParallelism(parallelism: Int): Unit = {
setParallelism(parallelism)
} }


/** /**
* Returns the default degree of parallelism for this execution environment. Note that this * Returns the default degree of parallelism for this execution environment. Note that this
* value can be overridden by individual operations using [[DataSet.setParallelism] * value can be overridden by individual operations using [[DataSet.setParallelism]
*/ */
def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism def setParallelism(parallelism: Int): Unit = {

javaEnv.setParallelism(parallelism)
}

/**
* Returns the default parallelism for this execution environment. Note that this
* value can be overridden by individual operations using [[DataSet.setParallelism]]
* @deprecated Please use [[getParallelism]]
*/
@deprecated
def getDegreeOfParallelism = javaEnv.getParallelism

/**
* Returns the default parallelism for this execution environment. Note that this
* value can be overridden by individual operations using [[DataSet.setParallelism]]
*/
def getParallelism = javaEnv.getParallelism

/** /**
* Sets the number of times that failed tasks are re-executed. A value of zero * Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of "-1" indicates that the system * effectively disables fault tolerance. A value of "-1" indicates that the system
Expand Down
Expand Up @@ -127,9 +127,17 @@ public void setEdgesInput(Operator<Record> c) {
public Operator<?> getOutput() { public Operator<?> getOutput() {
return this.iteration; return this.iteration;
} }


public void setDegreeOfParallelism(int dop) { /**
this.iteration.setDegreeOfParallelism(dop); * @deprecated Please use {@link #setParallelism}
*/
@Deprecated
public void setDegreeOfParallelism(int parallelism) {
setParallelism(parallelism);
}

public void setParallelism(int parallelism) {
this.iteration.setParallelism(parallelism);
} }


public void setNumberOfIterations(int iterations) { public void setNumberOfIterations(int iterations) {
Expand Down
Expand Up @@ -97,31 +97,63 @@ public ExecutionConfig getConfig() {
* default. Operations can individually override this value to use a * default. Operations can individually override this value to use a
* specific degree of parallelism. * specific degree of parallelism.
* *
* @return The degree of parallelism used by operations, unless they * @return The parallelism used by operations, unless they
* override that value. * override that value.
* @deprecated Please use {@link #getParallelism}
*/ */
@Deprecated
public int getDegreeOfParallelism() { public int getDegreeOfParallelism() {
return config.getDegreeOfParallelism(); return getParallelism();
} }


/** /**
* Sets the degree of parallelism (DOP) for operations executed through this * Gets the parallelism with which operation are executed by
* environment. Setting a DOP of x here will cause all operators (such as * default. Operations can individually override this value to use a
* specific parallelism.
*
* @return The parallelism used by operations, unless they
* override that value.
*/
public int getParallelism() {
return config.getParallelism();
}

/**
* Sets the parallelism for operations executed through this
* environment. Setting a parallelism of x here will cause all operators (such as
* map, batchReduce) to run with x parallel instances. This method overrides * map, batchReduce) to run with x parallel instances. This method overrides
* the default parallelism for this environment. The * the default parallelism for this environment. The
* {@link LocalStreamEnvironment} uses by default a value equal to the * {@link LocalStreamEnvironment} uses by default a value equal to the
* number of hardware contexts (CPU cores / threads). When executing the * number of hardware contexts (CPU cores / threads). When executing the
* program via the command line client from a JAR file, the default degree * program via the command line client from a JAR file, the default degree
* of parallelism is the one configured for that setup. * of parallelism is the one configured for that setup.
* *
* @param degreeOfParallelism * @param parallelism The parallelism
* The degree of parallelism * @deprecated Please use {@link #setParallelism}
*/
@Deprecated
public StreamExecutionEnvironment setDegreeOfParallelism(int parallelism) {
return setParallelism(parallelism);
}

/**
* Sets the parallelism for operations executed through this
* environment. Setting a parallelism of x here will cause all operators (such as
* map, batchReduce) to run with x parallel instances. This method overrides
* the default parallelism for this environment. The
* {@link LocalStreamEnvironment} uses by default a value equal to the
* number of hardware contexts (CPU cores / threads). When executing the
* program via the command line client from a JAR file, the default degree
* of parallelism is the one configured for that setup.
*
* @param parallelism
* The parallelism
*/ */
public StreamExecutionEnvironment setDegreeOfParallelism(int degreeOfParallelism) { public StreamExecutionEnvironment setParallelism(int parallelism) {
if (degreeOfParallelism < 1) { if (parallelism < 1) {
throw new IllegalArgumentException("Degree of parallelism must be at least one."); throw new IllegalArgumentException("parallelism must be at least one.");
} }
config.setDegreeOfParallelism(degreeOfParallelism); config.setParallelism(parallelism);
return this; return this;
} }


Expand Down
Expand Up @@ -37,16 +37,36 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances. This value can be overridden by specific operations using * x parallel instances. This value can be overridden by specific operations using
* [[DataStream.setParallelism]]. * [[DataStream.setParallelism]].
* @deprecated Please use [[setParallelism]]
*/ */
@deprecated
def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = { def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
javaEnv.setDegreeOfParallelism(degreeOfParallelism) javaEnv.setParallelism(degreeOfParallelism)
}

/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
* with x parallel instances. This value can be overridden by specific operations using
* [[DataStream.setParallelism]].
*/
def setParallelism(parallelism: Int): Unit = {
javaEnv.setParallelism(parallelism)
} }


/** /**
* Returns the default degree of parallelism for this execution environment. Note that this * Returns the default parallelism for this execution environment. Note that this
* value can be overridden by individual operations using [[DataStream.setParallelism]]
* @deprecated Please use [[getParallelism]]
*/
@deprecated
def getDegreeOfParallelism = javaEnv.getParallelism

/**
* Returns the default parallelism for this execution environment. Note that this
* value can be overridden by individual operations using [[DataStream.setParallelism]] * value can be overridden by individual operations using [[DataStream.setParallelism]]
*/ */
def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism def getParallelism = javaEnv.getParallelism


/** /**
* Sets the maximum time frequency (milliseconds) for the flushing of the * Sets the maximum time frequency (milliseconds) for the flushing of the
Expand Down

0 comments on commit d994d2e

Please sign in to comment.