From 36e98c4f11b6d4237a9b568f66056614dcaac66c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=98=E6=B1=9F?= Date: Tue, 14 Feb 2017 12:37:18 +0800 Subject: [PATCH 1/2] [FLINK-5133][core] Add new setResource API for DataStream and DataSet --- .../flink/api/common/operators/Operator.java | 36 ++++++++ .../flink/api/java/operators/DataSink.java | 61 +++++++++++++ .../api/java/operators/DeltaIteration.java | 46 +++++++++- .../flink/api/java/operators/Operator.java | 64 +++++++++++++ .../java/operators/OperatorTranslation.java | 26 ++++-- .../flink/api/java/operator/OperatorTest.java | 14 +++ .../apache/flink/optimizer/plan/PlanNode.java | 9 ++ .../org/apache/flink/api/scala/DataSet.scala | 56 +++++++++++- .../streaming/api/datastream/DataStream.java | 19 ++++ .../api/datastream/DataStreamSink.java | 35 +++++++ .../SingleOutputStreamOperator.java | 34 +++++++ .../streaming/api/graph/StreamGraph.java | 7 ++ .../api/graph/StreamGraphGenerator.java | 4 + .../flink/streaming/api/graph/StreamNode.java | 16 ++++ .../transformations/StreamTransformation.java | 42 +++++++++ .../flink/streaming/api/DataStreamTest.java | 91 +++++++++++++++++++ .../streaming/api/scala/DataStream.scala | 38 ++++++++ .../streaming/api/scala/DataStreamTest.scala | 68 +++++++++++++- 18 files changed, 657 insertions(+), 9 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index 7e70fd758a28f..385a4c60394da 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -45,6 +45,10 @@ public abstract class Operator implements Visitable> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use + private ResourceSpec minResource; // the minimum resource of the contract instance. optional + + private ResourceSpec maxResource; // the maximum resource of the contract instance. optional + /** * The return type of the user function. */ @@ -184,6 +188,38 @@ public int getParallelism() { public void setParallelism(int parallelism) { this.parallelism = parallelism; } + + /** + * Gets the minimum resource for this contract instance. The minimum resource denotes how many + * resources will be needed in the minimum for the user function during the execution. + * + * @return The minimum resource of this operator. + */ + public ResourceSpec getMinResource() { + return this.minResource; + } + + /** + * Gets the maximum resource for this contract instance. The maximum resource denotes how many + * resources will be needed in the maximum for the user function during the execution. + * + * @return The maximum resource of this operator. + */ + public ResourceSpec getMaxResource() { + return this.maxResource; + } + + /** + * Sets the minimum and maximum resources for this contract instance. The resource denotes + * how many memories and cpu cores of the user function will be consumed during the execution. + * + * @param minResource The minimum resource of this operator. + * @param maxResource The maximum resource of this operator. + */ + public void setResource(ResourceSpec minResource, ResourceSpec maxResource) { + this.minResource = minResource; + this.maxResource = maxResource; + } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 8b419d96c87d2..d78c89f8320f7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.NothingTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -51,6 +52,10 @@ public class DataSink { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private ResourceSpec minResource = ResourceSpec.UNKNOWN; + + private ResourceSpec maxResource = ResourceSpec.UNKNOWN; + private Configuration parameters; private int[] sortKeyPositions; @@ -278,4 +283,60 @@ public DataSink setParallelism(int parallelism) { return this; } + + /** + * Returns the minimum resource of this data sink. If no minimum resource has been set, + * it returns the default empty resource. + * + * @return The minimum resource of this data sink. + */ + public ResourceSpec getMinResource() { + return this.minResource; + } + + /** + * Returns the minimum resource of this data sink. If no maximum resource has been set, + * it returns the default empty resource. + * + * @return The maximum resource of this data sink. + */ + public ResourceSpec getMaxResource() { + return this.maxResource; + } + + /** + * Sets the minimum and maximum resources for this data sink. This overrides the default empty resource. + * The minimum resource must be satisfied and the maximum resource specifies the upper bound + * for dynamic resource resize. + * + * @param minResource The minimum resource for this data sink. + * @param maxResource The maximum resource for this data sink. + * @return The data sink with set minimum and maximum resources. + */ + public DataSink setResource(ResourceSpec minResource, ResourceSpec maxResource) { + Preconditions.checkArgument(minResource != null && maxResource != null, + "The min and max resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), + "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + + this.minResource = minResource; + this.maxResource = maxResource; + + return this; + } + + /** + * Sets the resource for this data sink. This overrides the default empty minimum and maximum resources. + * + * @param resource The resource for this data sink. + * @return The data sink with set minimum and maximum resources. + */ + public DataSink setResource(ResourceSpec resource) { + Preconditions.checkArgument(resource != null && resource.isValid(), "The resource must be not null and values greater than 0."); + + this.minResource = resource; + this.maxResource = resource; + + return this; + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index b97a9de35f6e4..eb56ee72bd5db 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.aggregators.AggregatorRegistry; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -62,10 +63,13 @@ public class DeltaIteration { private String name; private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + + private ResourceSpec minResource = ResourceSpec.UNKNOWN; + + private ResourceSpec maxResource = ResourceSpec.UNKNOWN; private boolean solutionSetUnManaged; - public DeltaIteration(ExecutionEnvironment context, TypeInformation type, DataSet solutionSet, DataSet workset, Keys keys, int maxIterations) { initialSolutionSet = solutionSet; initialWorkset = workset; @@ -192,6 +196,46 @@ public DeltaIteration parallelism(int parallelism) { public int getParallelism() { return parallelism; } + + /** + * Sets the minimum and maximum resources for the iteration. This overrides the default empty resource. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param minResource The minimum resource for the iteration. + * @param maxResource The maximum resource for the iteration. + * @return The iteration with set minimum and maximum resources. + */ + public DeltaIteration setResource(ResourceSpec minResource, ResourceSpec maxResource) { + Preconditions.checkArgument(minResource != null && maxResource != null, + "The min and max resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), + "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + + this.minResource = minResource; + this.maxResource = maxResource; + + return this; + } + + /** + * Gets the minimum resource from this iteration. If no minimum resource has been set, + * it returns the default empty resource. + * + * @return The minimum resource of the iteration. + */ + public ResourceSpec getMinResource() { + return this.minResource; + } + + /** + * Gets the maximum resource from this iteration. If no maximum resource has been set, + * it returns the default empty resource. + * + * @return The maximum resource of the iteration. + */ + public ResourceSpec getMaxResource() { + return this.maxResource; + } /** * Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java index 323d23eb3a49e..660d79bd5d61b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -38,6 +39,10 @@ public abstract class Operator> extends DataSet< protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + protected ResourceSpec minResource = ResourceSpec.UNKNOWN; + + protected ResourceSpec maxResource = ResourceSpec.UNKNOWN; + protected Operator(ExecutionEnvironment context, TypeInformation resultType) { super(context, resultType); } @@ -70,6 +75,26 @@ public int getParallelism() { return this.parallelism; } + /** + * Returns the minimum resource of this operator. If no minimum resource has been set, + * it returns the default empty resource. + * + * @return The minimum resource of this operator. + */ + public ResourceSpec getMinResource() { + return this.minResource; + } + + /** + * Returns the maximum resource of this operator. If no maximum resource has been set, + * it returns the default empty resource. + * + * @return The maximum resource of this operator. + */ + public ResourceSpec getMaxResource() { + return this.maxResource; + } + /** * Sets the name of this operator. This overrides the default name, which is either * a generated description of the operation (such as for example "Aggregate(1:SUM, 2:MIN)") @@ -103,4 +128,43 @@ public O setParallelism(int parallelism) { O returnType = (O) this; return returnType; } + + /** + * Sets the minimum and maximum resources for this operator. This overrides the default empty resource. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param minResource The minimum resource for this operator. + * @param maxResource The maximum resource for this operator. + * @return The operator with set minimum and maximum resources. + */ + public O setResource(ResourceSpec minResource, ResourceSpec maxResource) { + Preconditions.checkArgument(minResource != null && maxResource != null, + "The min and max resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), + "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + + this.minResource = minResource; + this.maxResource = maxResource; + + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } + + /** + * Sets the resource for this operator. This overrides the default empty minimum and maximum resources. + * + * @param resource The resource for this operator. + * @return The operator with set minimum and maximum resources. + */ + public O setResource(ResourceSpec resource) { + Preconditions.checkArgument(resource != null && resource.isValid(), "The resource must be not null and values greater than 0."); + + this.minResource = resource; + this.maxResource = resource; + + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 88c9c37e0e77a..9e7c9cd9765ad 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -63,7 +63,9 @@ private GenericDataSinkBase translate(DataSink sink) { // translate the sink itself and connect it to the input GenericDataSinkBase translatedSink = sink.translateToDataFlow(input); - + + translatedSink.setResource(sink.getMinResource(), sink.getMaxResource()); + return translatedSink; } @@ -91,19 +93,31 @@ private Operator translate(DataSet dataSet) { Operator dataFlowOp; if (dataSet instanceof DataSource) { - dataFlowOp = ((DataSource) dataSet).translateToDataFlow(); + DataSource dataSource = (DataSource) dataSet; + dataFlowOp = dataSource.translateToDataFlow(); + dataFlowOp.setResource(dataSource.getMinResource(), dataSource.getMaxResource()); } else if (dataSet instanceof SingleInputOperator) { - dataFlowOp = translateSingleInputOperator((SingleInputOperator) dataSet); + SingleInputOperator singleInputOperator = (SingleInputOperator) dataSet; + dataFlowOp = translateSingleInputOperator(singleInputOperator); + dataFlowOp.setResource(singleInputOperator.getMinResource(), singleInputOperator.getMaxResource()); } else if (dataSet instanceof TwoInputOperator) { - dataFlowOp = translateTwoInputOperator((TwoInputOperator) dataSet); + TwoInputOperator twoInputOperator = (TwoInputOperator) dataSet; + dataFlowOp = translateTwoInputOperator(twoInputOperator); + dataFlowOp.setResource(twoInputOperator.getMinResource(), twoInputOperator.getMaxResource()); } else if (dataSet instanceof BulkIterationResultSet) { - dataFlowOp = translateBulkIteration((BulkIterationResultSet) dataSet); + BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet) dataSet; + dataFlowOp = translateBulkIteration(bulkIterationResultSet); + dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResource(), + bulkIterationResultSet.getIterationHead().getMaxResource()); } else if (dataSet instanceof DeltaIterationResultSet) { - dataFlowOp = translateDeltaIteration((DeltaIterationResultSet) dataSet); + DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet) dataSet; + dataFlowOp = translateDeltaIteration(deltaIterationResultSet); + dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(), + deltaIterationResultSet.getIterationHead().getMaxResource()); } else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) { throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action." diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java index a69ca3c219060..3a107f1b9e2e6 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operator; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.typeutils.ValueTypeInfo; @@ -45,6 +46,19 @@ public void testConfigurationOfParallelism() { assertEquals(parallelism, operator.getParallelism()); } + @Test + public void testConfigurationOfResource() { + Operator operator = new MockOperator(); + + // verify explicit change in resource + ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0); + ResourceSpec maxResource = new ResourceSpec(2.0, 200, 0, 0, 0); + operator.setResource(minResource, maxResource); + + assertEquals(minResource, operator.getMinResource()); + assertEquals(maxResource, operator.getMaxResource()); + } + private class MockOperator extends Operator { public MockOperator() { super(ExecutionEnvironment.createCollectionsEnvironment(), ValueTypeInfo.NULL_VALUE_TYPE_INFO); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java index b30fa36ba026e..77eae9831a027 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java @@ -19,6 +19,7 @@ package org.apache.flink.optimizer.plan; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.costs.Costs; @@ -308,6 +309,14 @@ public void setParallelism(int parallelism) { public int getParallelism() { return this.parallelism; } + + public ResourceSpec getMinResource() { + return this.template.getOperator().getMinResource(); + } + + public ResourceSpec getMaxResource() { + return this.template.getOperator().getMaxResource(); + } public long getGuaranteedAvailableMemory() { return this.template.getMinimalMemoryAcrossAllSubTasks(); diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 4e7be042901ff..56b53c35c9181 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator import org.apache.flink.api.common.aggregators.Aggregator import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat} -import org.apache.flink.api.common.operators.{Keys, Order} +import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order} import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod @@ -177,6 +177,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { "parallelism.") } + /** + * Sets the minimum and maximum resources of this operation. + */ + def setResource(minResource: ResourceSpec, maxResource: ResourceSpec) = { + javaSet match { + case ds: DataSource[_] => ds.setResource(minResource, maxResource) + case op: Operator[_, _] => op.setResource(minResource, maxResource) + case di: DeltaIterationResultSet[_, _] => + di.getIterationHead.setResource(minResource, maxResource) + case _ => + throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " + + "resource.") + } + this + } + + /** + * Sets the resource of this operation. + */ + def setResource(resource: ResourceSpec) = { + javaSet match { + case ds: DataSource[_] => ds.setResource(resource, resource) + case op: Operator[_, _] => op.setResource(resource, resource) + case di: DeltaIterationResultSet[_, _] => + di.getIterationHead.setResource(resource, resource) + case _ => + throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " + + "resource.") + } + this + } + + /** + * Returns the minimum resource of this operation. + */ + def getMinResource: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.getMinResource + case op: Operator[_, _] => op.getMinResource + case _ => + throw new UnsupportedOperationException("Operator " + javaSet.toString + " does not have " + + "resource.") + } + + /** + * Returns the maximum resource of this operation. + */ + def getMaxResource: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.getMaxResource + case op: Operator[_, _] => op.getMaxResource + case _ => + throw new UnsupportedOperationException("Operator " + javaSet.toString + " does not have " + + "resource.") + } + /** * Registers an [[org.apache.flink.api.common.aggregators.Aggregator]] * for the iteration. Aggregators can be used to maintain simple statistics during the diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 204557db0100a..c6a4ea18e3490 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -142,6 +143,24 @@ public int getParallelism() { return transformation.getParallelism(); } + /** + * Gets the minimum resource for this operator. + * + * @return The minimum resource set for this operator. + */ + public ResourceSpec getMinResource() { + return transformation.getMinResource(); + } + + /** + * Gets the maximum resource for this operator. + * + * @return The maximum resource set for this operator. + */ + public ResourceSpec getMaxResource() { + return transformation.getMaxResource(); + } + /** * Gets the type of the stream. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 0c9378bffb0ae..e93a320c2a7eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -20,9 +20,11 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.util.Preconditions; /** * A Stream Sink. This is used for emitting elements from a streaming topology. @@ -113,6 +115,39 @@ public DataStreamSink setParallelism(int parallelism) { return this; } + /** + * Sets the minimum and maximum resources for this sink, and the lower and upper resource limits will + * be considered in resource resize feature for future plan. + * + * @param minResource The minimum resource for this sink. + * @param maxResource The maximum resource for this sink + * @return The sink with set minimum and maximum resources. + */ + public DataStreamSink setResource(ResourceSpec minResource, ResourceSpec maxResource) { + Preconditions.checkArgument(minResource != null && maxResource != null, + "The min and max resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), + "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + + transformation.setResource(minResource, maxResource); + + return this; + } + + /** + * Sets the resource for this sink, the minimum and maximum resources are the same by default. + * + * @param resource The resource for this sink. + * @return The sink with set minimum and maximum resources. + */ + public DataStreamSink setResource(ResourceSpec resource) { + Preconditions.checkArgument(resource != null && resource.isValid(), "The resource must be not null and values greater than 0."); + + transformation.setResource(resource, resource); + + return this; + } + /** * Turns off chaining for this operator so thread co-location will not be * used as an optimization. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 9dd60b708b258..ad1d0ca78375c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -154,6 +155,39 @@ public SingleOutputStreamOperator setMaxParallelism(int maxParallelism) { return this; } + /** + * Sets the minimum and maximum resources for this operator, and the lower and upper resource limits will + * be considered in dynamic resource resize feature for future plan. + * + * @param minResource The minimum resource for this operator. + * @param maxResource The maximum resource for this operator. + * @return The operator with set minimum and maximum resources. + */ + public SingleOutputStreamOperator setResource(ResourceSpec minResource, ResourceSpec maxResource) { + Preconditions.checkArgument(minResource != null && maxResource != null, + "The min and max resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), + "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + + transformation.setResource(minResource, maxResource); + + return this; + } + + /** + * Sets the resource for this operator, the minimum and maximum resources are the same by default. + * + * @param resource The resource for this operator. + * @return The operator with set minimum and maximum resources. + */ + public SingleOutputStreamOperator setResource(ResourceSpec resource) { + Preconditions.checkArgument(resource != null && resource.isValid(), "The resources must be not null and values greater than 0."); + + transformation.setResource(resource, resource); + + return this; + } + private boolean canBeParallel() { return !nonParallel; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 696c04be4b1b3..f5bb6a5ac9e8b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -32,6 +32,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -413,6 +414,12 @@ public void setMaxParallelism(int vertexID, int maxParallelism) { } } + public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec maxResource) { + if (getStreamNode(vertexID) != null) { + getStreamNode(vertexID).setResource(minResource, maxResource); + } + } + public void setOneInputStateKey(Integer vertexID, KeySelector keySelector, TypeSerializer keySerializer) { StreamNode node = getStreamNode(vertexID); node.setStatePartitioner1(keySelector); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index ddd05154d1006..a5019665b59fd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -202,6 +202,10 @@ private Collection transform(StreamTransformation transform) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } + if (transform.getMinResource() != null && transform.getMaxResource() != null) { + streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getMaxResource()); + } + return transformedIds; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 19a369931c325..810fde98e81b1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -48,6 +49,8 @@ public class StreamNode implements Serializable { * dynamic scaling and the number of key groups used for partitioned state. */ private int maxParallelism; + private ResourceSpec minResource; + private ResourceSpec maxResource; private Long bufferTimeout = null; private final String operatorName; private String slotSharingGroup; @@ -165,6 +168,19 @@ void setMaxParallelism(int maxParallelism) { this.maxParallelism = maxParallelism; } + public ResourceSpec getMinResource() { + return minResource != null ? minResource : ResourceSpec.UNKNOWN; + } + + public ResourceSpec getMaxResource() { + return maxResource != null ? maxResource : ResourceSpec.UNKNOWN; + } + + public void setResource(ResourceSpec minResource, ResourceSpec maxResource) { + this.minResource = minResource; + this.maxResource = maxResource; + } + public Long getBufferTimeout() { return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index 5e1b3e27aa145..d8ff14fbc1ad8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -125,6 +126,18 @@ public static int getNewNodeId() { */ private int maxParallelism = -1; + /** + * The minimum resource for this stream transformation. It defines the lower limit for + * dynamic resource resize in future plan. + */ + private ResourceSpec minResource; + + /** + * The maximum resource for this stream transformation. It defines the upper limit for + * dynamic resource resize in future plan. + */ + private ResourceSpec maxResource; + /** * User-specified ID for this transformation. This is used to assign the * same operator ID across job restarts. There is also the automatically @@ -213,6 +226,35 @@ public void setMaxParallelism(int maxParallelism) { this.maxParallelism = maxParallelism; } + /** + * Sets the minimum and maximum resources for this stream transformation. + * + * @param minResource The minimum resource of this transformation. + * @param maxResource The maximum resource of this transformation. + */ + public void setResource(ResourceSpec minResource, ResourceSpec maxResource) { + this.minResource = minResource; + this.maxResource = maxResource; + } + + /** + * Gets the minimum resource of this stream transformation. + * + * @return The minimum resource of this transformation. + */ + public ResourceSpec getMinResource() { + return minResource; + } + + /** + * Gets the maximum resource of this stream transformation. + * + * @return The maximum resource of this transformation. + */ + public ResourceSpec getMaxResource() { + return maxResource; + } + /** * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID. *

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index eaac6b8cc9fac..a2b8e79030180 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -501,6 +502,96 @@ public void invoke(Long value) throws Exception { assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism()); } + /** + * Tests whether resource gets set. + */ + @Test + public void testResource() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + ResourceSpec minResource1 = new ResourceSpec(1.0, 100); + ResourceSpec maxResource1 = new ResourceSpec(2.0, 200); + + ResourceSpec minResource2 = new ResourceSpec(1.0, 200); + ResourceSpec maxResource2 = new ResourceSpec(2.0, 300); + + ResourceSpec minResource3 = new ResourceSpec(1.0, 300); + ResourceSpec maxResource3 = new ResourceSpec(2.0, 400); + + ResourceSpec minResource4 = new ResourceSpec(1.0, 400); + ResourceSpec maxResource4 = new ResourceSpec(2.0, 500); + + ResourceSpec minResource5 = new ResourceSpec(1.0, 500); + ResourceSpec maxResource5 = new ResourceSpec(2.0, 600); + + ResourceSpec minResource6 = new ResourceSpec(1.0, 600); + ResourceSpec maxResource6 = new ResourceSpec(2.0, 700); + + ResourceSpec minResource7 = new ResourceSpec(1.0, 700); + ResourceSpec maxResource7 = new ResourceSpec(2.0, 800); + + DataStream source1 = env.generateSequence(0, 0).setResource(minResource1, maxResource1); + DataStream map1 = source1.map(new MapFunction() { + @Override + public Long map(Long value) throws Exception { + return null; + } + }).setResource(minResource2, maxResource2); + + DataStream source2 = env.generateSequence(0, 0).setResource(minResource3, maxResource3); + DataStream map2 = source2.map(new MapFunction() { + @Override + public Long map(Long value) throws Exception { + return null; + } + }).setResource(minResource4, maxResource4); + + DataStream connected = map1.connect(map2) + .flatMap(new CoFlatMapFunction() { + @Override + public void flatMap1(Long value, Collector out) throws Exception { + } + @Override + public void flatMap2(Long value, Collector out) throws Exception { + } + }).setResource(minResource5, maxResource5); + + DataStream windowed = connected + .windowAll(GlobalWindows.create()) + .trigger(PurgingTrigger.of(CountTrigger.of(10))) + .fold(0L, new FoldFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Long fold(Long accumulator, Long value) throws Exception { + return null; + } + }).setResource(minResource6, maxResource6); + + DataStreamSink sink = windowed.print().setResource(minResource7, maxResource7); + + assertEquals(minResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMinResource()); + assertEquals(maxResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMaxResource()); + + assertEquals(minResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMinResource()); + assertEquals(maxResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMaxResource()); + + assertEquals(minResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMinResource()); + assertEquals(maxResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMaxResource()); + + assertEquals(minResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMinResource()); + assertEquals(maxResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMaxResource()); + + assertEquals(minResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMinResource()); + assertEquals(maxResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMaxResource()); + + assertEquals(minResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMinResource()); + assertEquals(maxResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMaxResource()); + + assertEquals(minResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResource()); + assertEquals(maxResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMaxResource()); + } + @Test public void testTypeInfo() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index ba92f863d9cf9..d5a376616aa5d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -22,6 +22,7 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} import org.apache.flink.api.common.io.OutputFormat +import org.apache.flink.api.common.operators.ResourceSpec import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.tuple.{Tuple => JavaTuple} @@ -144,6 +145,43 @@ class DataStream[T](stream: JavaStream[T]) { this } + + /** + * Returns the minimum resource of this operation. + */ + def getMinResource: ResourceSpec = stream.getMinResource() + + /** + * Returns the maximum resource of this operation. + */ + def getMaxResource: ResourceSpec = stream.getMaxResource() + + /** + * Sets the minimum and maximum resources of this operation. + */ + def setResource(minResource: ResourceSpec, maxResource: ResourceSpec): DataStream[T] = { + stream match { + case ds: SingleOutputStreamOperator[T] => ds.setResource(minResource, maxResource) + case _ => + throw new UnsupportedOperationException( + "Operator " + stream + " cannot set the resource.") + } + this + } + + /** + * Sets the resource of this operation. + */ + def setResource(resource: ResourceSpec): DataStream[T] = { + stream match { + case ds: SingleOutputStreamOperator[T] => ds.setResource(resource, resource) + case _ => + throw new UnsupportedOperationException( + "Operator " + stream + " cannot set the resource.") + } + this + } + /** * Gets the name of the current data stream. This name is * used by the visualization and logging during runtime. diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index adb59f2f1bf42..fdc50342b66cb 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala import java.lang import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.operators.ResourceSpec import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.functions.ProcessFunction @@ -34,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.streaming.runtime.partitioner._ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector -import org.junit.Assert.fail +import org.junit.Assert._ import org.junit.Test class DataStreamTest extends StreamingMultipleProgramsTestBase { @@ -291,6 +292,71 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { assert(4 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) } + /** + * Tests whether resource gets set. + */ + @Test + def testResource() { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val minResource1: ResourceSpec = new ResourceSpec(1.0, 100) + val maxResource1: ResourceSpec = new ResourceSpec(2.0, 200) + val minResource2: ResourceSpec = new ResourceSpec(1.0, 200) + val maxResource2: ResourceSpec = new ResourceSpec(2.0, 300) + val minResource3: ResourceSpec = new ResourceSpec(1.0, 300) + val maxResource3: ResourceSpec = new ResourceSpec(2.0, 400) + val minResource4: ResourceSpec = new ResourceSpec(1.0, 400) + val maxResource4: ResourceSpec = new ResourceSpec(2.0, 500) + val minResource5: ResourceSpec = new ResourceSpec(1.0, 500) + val maxResource5: ResourceSpec = new ResourceSpec(2.0, 600) + val minResource6: ResourceSpec = new ResourceSpec(1.0, 600) + val maxResource6: ResourceSpec = new ResourceSpec(2.0, 700) + val minResource7: ResourceSpec = new ResourceSpec(1.0, 700) + val maxResource7: ResourceSpec = new ResourceSpec(2.0, 800) + + val source1: DataStream[Long] = env.generateSequence(0, 0) + .setResource(minResource1, maxResource1) + val map1: DataStream[String] = source1 + .map(x => "") + .setResource(minResource2, maxResource2) + val source2: DataStream[Long] = env.generateSequence(0, 0) + .setResource(minResource3, maxResource3) + val map2: DataStream[String] = source2 + .map(x => "") + .setResource(minResource4, maxResource4) + + val connected: DataStream[String] = map1.connect(map2) + .flatMap({ (in, out: Collector[(String)]) => }, { (in, out: Collector[(String)]) => }) + .setResource(minResource5, maxResource5) + + val windowed = connected + .windowAll(GlobalWindows.create()) + .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5))) + .fold("")((accumulator: String, value: String) => "") + .setResource(minResource6, maxResource6) + + var sink = windowed.print().setResource(minResource7, maxResource7) + + val plan = env.getExecutionPlan + + assertEquals(minResource1, env.getStreamGraph.getStreamNode(source1.getId).getMinResource) + assertEquals(maxResource1, env.getStreamGraph.getStreamNode(source1.getId).getMaxResource) + assertEquals(minResource2, env.getStreamGraph.getStreamNode(map1.getId).getMinResource) + assertEquals(maxResource2, env.getStreamGraph.getStreamNode(map1.getId).getMaxResource) + assertEquals(minResource3, env.getStreamGraph.getStreamNode(source2.getId).getMinResource) + assertEquals(maxResource3, env.getStreamGraph.getStreamNode(source2.getId).getMaxResource) + assertEquals(minResource4, env.getStreamGraph.getStreamNode(map2.getId).getMinResource) + assertEquals(maxResource4, env.getStreamGraph.getStreamNode(map2.getId).getMaxResource) + assertEquals(minResource5, env.getStreamGraph.getStreamNode(connected.getId).getMinResource) + assertEquals(maxResource5, env.getStreamGraph.getStreamNode(connected.getId).getMaxResource) + assertEquals(minResource6, env.getStreamGraph.getStreamNode(windowed.getId).getMinResource) + assertEquals(maxResource6, env.getStreamGraph.getStreamNode(windowed.getId).getMaxResource) + assertEquals(minResource7, env.getStreamGraph.getStreamNode(sink.getTransformation.getId) + .getMinResource) + assertEquals(maxResource7, env.getStreamGraph.getStreamNode(sink.getTransformation.getId) + .getMaxResource) + } + @Test def testTypeInfo() { val env = StreamExecutionEnvironment.getExecutionEnvironment From c4aa9c051816b4ec5bb76bbab5910af535703416 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=98=E6=B1=9F?= Date: Mon, 27 Feb 2017 21:22:48 +0800 Subject: [PATCH 2/2] [FLINK-5133][core] Support to set resource for operator in DataStream and DataSet --- .../flink/api/common/operators/Operator.java | 20 ++--- .../flink/api/java/operators/DataSink.java | 45 ++++++----- .../api/java/operators/DeltaIteration.java | 49 +++++++---- .../flink/api/java/operators/Operator.java | 45 ++++++----- .../java/operators/OperatorTranslation.java | 14 ++-- .../flink/api/java/operator/OperatorTest.java | 9 ++- .../apache/flink/optimizer/plan/PlanNode.java | 4 +- .../org/apache/flink/api/scala/DataSet.scala | 55 ++++++------- .../streaming/api/datastream/DataStream.java | 10 +-- .../api/datastream/DataStreamSink.java | 33 ++++---- .../SingleOutputStreamOperator.java | 32 ++++---- .../streaming/api/graph/StreamGraph.java | 4 +- .../api/graph/StreamGraphGenerator.java | 4 +- .../flink/streaming/api/graph/StreamNode.java | 12 +-- .../transformations/StreamTransformation.java | 22 ++--- .../flink/streaming/api/DataStreamTest.java | 41 +++++----- .../streaming/api/scala/DataStream.scala | 37 ++++----- .../streaming/api/scala/DataStreamTest.scala | 81 +++++++++++-------- 18 files changed, 273 insertions(+), 244 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index 385a4c60394da..a9dedfa6930d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -45,9 +45,9 @@ public abstract class Operator implements Visitable> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use - private ResourceSpec minResource; // the minimum resource of the contract instance. optional + private ResourceSpec minResource; // the minimum resource of the contract instance. - private ResourceSpec maxResource; // the maximum resource of the contract instance. optional + private ResourceSpec preferredResource; // the preferred resource of the contract instance. /** * The return type of the user function. @@ -200,25 +200,25 @@ public ResourceSpec getMinResource() { } /** - * Gets the maximum resource for this contract instance. The maximum resource denotes how many + * Gets the preferred resource for this contract instance. The preferred resource denotes how many * resources will be needed in the maximum for the user function during the execution. * - * @return The maximum resource of this operator. + * @return The preferred resource of this operator. */ - public ResourceSpec getMaxResource() { - return this.maxResource; + public ResourceSpec getPreferredResource() { + return this.preferredResource; } /** - * Sets the minimum and maximum resources for this contract instance. The resource denotes + * Sets the minimum and preferred resources for this contract instance. The resource denotes * how many memories and cpu cores of the user function will be consumed during the execution. * * @param minResource The minimum resource of this operator. - * @param maxResource The maximum resource of this operator. + * @param preferredResource The preferred resource of this operator. */ - public void setResource(ResourceSpec minResource, ResourceSpec maxResource) { + public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { this.minResource = minResource; - this.maxResource = maxResource; + this.preferredResource = preferredResource; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index d78c89f8320f7..3be9cc0cbb928 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -54,7 +54,7 @@ public class DataSink { private ResourceSpec minResource = ResourceSpec.UNKNOWN; - private ResourceSpec maxResource = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; private Configuration parameters; @@ -295,48 +295,51 @@ public ResourceSpec getMinResource() { } /** - * Returns the minimum resource of this data sink. If no maximum resource has been set, + * Returns the preferred resource of this data sink. If no preferred resource has been set, * it returns the default empty resource. * - * @return The maximum resource of this data sink. + * @return The preferred resource of this data sink. */ - public ResourceSpec getMaxResource() { - return this.maxResource; + public ResourceSpec getPreferredResource() { + return this.preferredResource; } /** - * Sets the minimum and maximum resources for this data sink. This overrides the default empty resource. - * The minimum resource must be satisfied and the maximum resource specifies the upper bound + * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource. + * The minimum resource must be satisfied and the preferred resource specifies the upper bound * for dynamic resource resize. * * @param minResource The minimum resource for this data sink. - * @param maxResource The maximum resource for this data sink. - * @return The data sink with set minimum and maximum resources. + * @param preferredResource The preferred resource for this data sink. + * @return The data sink with set minimum and preferred resources. */ - public DataSink setResource(ResourceSpec minResource, ResourceSpec maxResource) { - Preconditions.checkArgument(minResource != null && maxResource != null, - "The min and max resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), - "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + /* + public DataSink setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); this.minResource = minResource; - this.maxResource = maxResource; + this.preferredResource = preferredResource; return this; - } + }*/ /** - * Sets the resource for this data sink. This overrides the default empty minimum and maximum resources. + * Sets the resource for this data sink. This overrides the default empty minimum and preferred resources. * * @param resource The resource for this data sink. - * @return The data sink with set minimum and maximum resources. + * @return The data sink with set minimum and preferred resources. */ + /* public DataSink setResource(ResourceSpec resource) { - Preconditions.checkArgument(resource != null && resource.isValid(), "The resource must be not null and values greater than 0."); + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); this.minResource = resource; - this.maxResource = resource; + this.preferredResource = resource; return this; - } + }*/ } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index eb56ee72bd5db..cf0a63e576c46 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -66,7 +66,7 @@ public class DeltaIteration { private ResourceSpec minResource = ResourceSpec.UNKNOWN; - private ResourceSpec maxResource = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; private boolean solutionSetUnManaged; @@ -198,24 +198,43 @@ public int getParallelism() { } /** - * Sets the minimum and maximum resources for the iteration. This overrides the default empty resource. + * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource. * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. * * @param minResource The minimum resource for the iteration. - * @param maxResource The maximum resource for the iteration. - * @return The iteration with set minimum and maximum resources. + * @param preferredResource The preferred resource for the iteration. + * @return The iteration with set minimum and preferred resources. */ - public DeltaIteration setResource(ResourceSpec minResource, ResourceSpec maxResource) { - Preconditions.checkArgument(minResource != null && maxResource != null, - "The min and max resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), - "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + /* + public DeltaIteration setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); this.minResource = minResource; - this.maxResource = maxResource; + this.preferredResource = preferredResource; return this; - } + }*/ + + /** + * Sets the resource for the iteration, and the minimum and preferred resources are the same by default. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param resource The resource for the iteration. + * @return The iteration with set minimum and preferred resources. + */ + /* + public DeltaIteration setResource(ResourceSpec resource) { + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0."); + + this.minResource = resource; + this.preferredResource = resource; + + return this; + }*/ /** * Gets the minimum resource from this iteration. If no minimum resource has been set, @@ -228,13 +247,13 @@ public ResourceSpec getMinResource() { } /** - * Gets the maximum resource from this iteration. If no maximum resource has been set, + * Gets the preferred resource from this iteration. If no preferred resource has been set, * it returns the default empty resource. * - * @return The maximum resource of the iteration. + * @return The preferred resource of the iteration. */ - public ResourceSpec getMaxResource() { - return this.maxResource; + public ResourceSpec getPreferredResource() { + return this.preferredResource; } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java index 660d79bd5d61b..79cae14342469 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java @@ -41,7 +41,7 @@ public abstract class Operator> extends DataSet< protected ResourceSpec minResource = ResourceSpec.UNKNOWN; - protected ResourceSpec maxResource = ResourceSpec.UNKNOWN; + protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN; protected Operator(ExecutionEnvironment context, TypeInformation resultType) { super(context, resultType); @@ -81,18 +81,18 @@ public int getParallelism() { * * @return The minimum resource of this operator. */ - public ResourceSpec getMinResource() { + public ResourceSpec minResource() { return this.minResource; } /** - * Returns the maximum resource of this operator. If no maximum resource has been set, + * Returns the preferred resource of this operator. If no preferred resource has been set, * it returns the default empty resource. * - * @return The maximum resource of this operator. + * @return The preferred resource of this operator. */ - public ResourceSpec getMaxResource() { - return this.maxResource; + public ResourceSpec preferredResource() { + return this.preferredResource; } /** @@ -130,41 +130,44 @@ public O setParallelism(int parallelism) { } /** - * Sets the minimum and maximum resources for this operator. This overrides the default empty resource. + * Sets the minimum and preferred resources for this operator. This overrides the default empty resource. * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. * * @param minResource The minimum resource for this operator. - * @param maxResource The maximum resource for this operator. - * @return The operator with set minimum and maximum resources. + * @param preferredResource The preferred resource for this operator. + * @return The operator with set minimum and preferred resources. */ - public O setResource(ResourceSpec minResource, ResourceSpec maxResource) { - Preconditions.checkArgument(minResource != null && maxResource != null, - "The min and max resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), - "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + /* + public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); this.minResource = minResource; - this.maxResource = maxResource; + this.preferredResource = preferredResource; @SuppressWarnings("unchecked") O returnType = (O) this; return returnType; - } + }*/ /** - * Sets the resource for this operator. This overrides the default empty minimum and maximum resources. + * Sets the resource for this operator. This overrides the default empty minimum and preferred resources. * * @param resource The resource for this operator. - * @return The operator with set minimum and maximum resources. + * @return The operator with set minimum and preferred resources. */ + /* public O setResource(ResourceSpec resource) { - Preconditions.checkArgument(resource != null && resource.isValid(), "The resource must be not null and values greater than 0."); + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); this.minResource = resource; - this.maxResource = resource; + this.preferredResource = resource; @SuppressWarnings("unchecked") O returnType = (O) this; return returnType; - } + }*/ } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 9e7c9cd9765ad..909cd32c092d1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -64,7 +64,7 @@ private GenericDataSinkBase translate(DataSink sink) { // translate the sink itself and connect it to the input GenericDataSinkBase translatedSink = sink.translateToDataFlow(input); - translatedSink.setResource(sink.getMinResource(), sink.getMaxResource()); + translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource()); return translatedSink; } @@ -95,29 +95,29 @@ private Operator translate(DataSet dataSet) { if (dataSet instanceof DataSource) { DataSource dataSource = (DataSource) dataSet; dataFlowOp = dataSource.translateToDataFlow(); - dataFlowOp.setResource(dataSource.getMinResource(), dataSource.getMaxResource()); + dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource()); } else if (dataSet instanceof SingleInputOperator) { SingleInputOperator singleInputOperator = (SingleInputOperator) dataSet; dataFlowOp = translateSingleInputOperator(singleInputOperator); - dataFlowOp.setResource(singleInputOperator.getMinResource(), singleInputOperator.getMaxResource()); + dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource()); } else if (dataSet instanceof TwoInputOperator) { TwoInputOperator twoInputOperator = (TwoInputOperator) dataSet; dataFlowOp = translateTwoInputOperator(twoInputOperator); - dataFlowOp.setResource(twoInputOperator.getMinResource(), twoInputOperator.getMaxResource()); + dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource()); } else if (dataSet instanceof BulkIterationResultSet) { BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet) dataSet; dataFlowOp = translateBulkIteration(bulkIterationResultSet); - dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResource(), - bulkIterationResultSet.getIterationHead().getMaxResource()); + dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(), + bulkIterationResultSet.getIterationHead().preferredResource()); } else if (dataSet instanceof DeltaIterationResultSet) { DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet) dataSet; dataFlowOp = translateDeltaIteration(deltaIterationResultSet); dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(), - deltaIterationResultSet.getIterationHead().getMaxResource()); + deltaIterationResultSet.getIterationHead().getPreferredResource()); } else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) { throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action." diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java index 3a107f1b9e2e6..992acc9433519 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java @@ -46,18 +46,19 @@ public void testConfigurationOfParallelism() { assertEquals(parallelism, operator.getParallelism()); } + /* @Test public void testConfigurationOfResource() { Operator operator = new MockOperator(); // verify explicit change in resource ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0); - ResourceSpec maxResource = new ResourceSpec(2.0, 200, 0, 0, 0); - operator.setResource(minResource, maxResource); + ResourceSpec preferredResource = new ResourceSpec(2.0, 200, 0, 0, 0); + operator.setResource(minResource, preferredResource); assertEquals(minResource, operator.getMinResource()); - assertEquals(maxResource, operator.getMaxResource()); - } + assertEquals(preferredResource, operator.getPreferredResource()); + }*/ private class MockOperator extends Operator { public MockOperator() { diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java index 77eae9831a027..4ef91b328394b 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java @@ -314,8 +314,8 @@ public ResourceSpec getMinResource() { return this.template.getOperator().getMinResource(); } - public ResourceSpec getMaxResource() { - return this.template.getOperator().getMaxResource(); + public ResourceSpec getPreferredResource() { + return this.template.getOperator().getPreferredResource(); } public long getGuaranteedAvailableMemory() { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 56b53c35c9181..5cfb6018b4bcb 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -178,57 +178,50 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** - * Sets the minimum and maximum resources of this operation. + * Sets the minimum and preferred resources of this operation. */ - def setResource(minResource: ResourceSpec, maxResource: ResourceSpec) = { + /* + def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : Unit = { javaSet match { - case ds: DataSource[_] => ds.setResource(minResource, maxResource) - case op: Operator[_, _] => op.setResource(minResource, maxResource) + case ds: DataSource[_] => ds.setResource(minResource, preferredResource) + case op: Operator[_, _] => op.setResource(minResource, preferredResource) case di: DeltaIterationResultSet[_, _] => - di.getIterationHead.setResource(minResource, maxResource) + di.getIterationHead.setResource(minResource, preferredResource) case _ => - throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " + - "resource.") + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") } this - } + }*/ /** * Sets the resource of this operation. */ - def setResource(resource: ResourceSpec) = { - javaSet match { - case ds: DataSource[_] => ds.setResource(resource, resource) - case op: Operator[_, _] => op.setResource(resource, resource) - case di: DeltaIterationResultSet[_, _] => - di.getIterationHead.setResource(resource, resource) - case _ => - throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " + - "resource.") - } - this - } + /* + def resource(resource: ResourceSpec) : Unit = { + this.resource(resource, resource) + }*/ /** * Returns the minimum resource of this operation. */ - def getMinResource: ResourceSpec = javaSet match { - case ds: DataSource[_] => ds.getMinResource - case op: Operator[_, _] => op.getMinResource + def minResource: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.minResource() + case op: Operator[_, _] => op.minResource case _ => - throw new UnsupportedOperationException("Operator " + javaSet.toString + " does not have " + - "resource.") + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") } /** - * Returns the maximum resource of this operation. + * Returns the preferred resource of this operation. */ - def getMaxResource: ResourceSpec = javaSet match { - case ds: DataSource[_] => ds.getMaxResource - case op: Operator[_, _] => op.getMaxResource + def preferredResource: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.preferredResource() + case op: Operator[_, _] => op.preferredResource case _ => - throw new UnsupportedOperationException("Operator " + javaSet.toString + " does not have " + - "resource.") + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index c6a4ea18e3490..ae1c39a09b920 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -148,17 +148,17 @@ public int getParallelism() { * * @return The minimum resource set for this operator. */ - public ResourceSpec getMinResource() { + public ResourceSpec minResource() { return transformation.getMinResource(); } /** - * Gets the maximum resource for this operator. + * Gets the preferred resource for this operator. * - * @return The maximum resource set for this operator. + * @return The preferred resource set for this operator. */ - public ResourceSpec getMaxResource() { - return transformation.getMaxResource(); + public ResourceSpec preferredResource() { + return transformation.getPreferredResource(); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index e93a320c2a7eb..69e21d6d18fad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -20,11 +20,9 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; -import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.SinkTransformation; -import org.apache.flink.util.Preconditions; /** * A Stream Sink. This is used for emitting elements from a streaming topology. @@ -116,37 +114,40 @@ public DataStreamSink setParallelism(int parallelism) { } /** - * Sets the minimum and maximum resources for this sink, and the lower and upper resource limits will + * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will * be considered in resource resize feature for future plan. * * @param minResource The minimum resource for this sink. - * @param maxResource The maximum resource for this sink - * @return The sink with set minimum and maximum resources. + * @param preferredResource The preferred resource for this sink + * @return The sink with set minimum and preferred resources. */ - public DataStreamSink setResource(ResourceSpec minResource, ResourceSpec maxResource) { - Preconditions.checkArgument(minResource != null && maxResource != null, - "The min and max resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), - "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + /* + public DataStreamSink setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); - transformation.setResource(minResource, maxResource); + transformation.setResource(minResource, preferredResource); return this; - } + }*/ /** - * Sets the resource for this sink, the minimum and maximum resources are the same by default. + * Sets the resource for this sink, the minimum and preferred resources are the same by default. * * @param resource The resource for this sink. - * @return The sink with set minimum and maximum resources. + * @return The sink with set minimum and preferred resources. */ + /* public DataStreamSink setResource(ResourceSpec resource) { - Preconditions.checkArgument(resource != null && resource.isValid(), "The resource must be not null and values greater than 0."); + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); transformation.setResource(resource, resource); return this; - } + }*/ /** * Turns off chaining for this operator so thread co-location will not be diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index ad1d0ca78375c..d856603b6f72d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.InvalidTypesException; -import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeInfoParser; @@ -156,37 +155,40 @@ public SingleOutputStreamOperator setMaxParallelism(int maxParallelism) { } /** - * Sets the minimum and maximum resources for this operator, and the lower and upper resource limits will + * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will * be considered in dynamic resource resize feature for future plan. * * @param minResource The minimum resource for this operator. - * @param maxResource The maximum resource for this operator. - * @return The operator with set minimum and maximum resources. + * @param preferredResource The preferred resource for this operator. + * @return The operator with set minimum and preferred resources. */ - public SingleOutputStreamOperator setResource(ResourceSpec minResource, ResourceSpec maxResource) { - Preconditions.checkArgument(minResource != null && maxResource != null, - "The min and max resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && maxResource.isValid() && minResource.lessThanOrEqual(maxResource), - "The values in resource must be not less than 0 and the max resource must be greater than the min resource."); + /* + public SingleOutputStreamOperator setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkArgument(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); - transformation.setResource(minResource, maxResource); + transformation.setResource(minResource, preferredResource); return this; - } + }*/ /** - * Sets the resource for this operator, the minimum and maximum resources are the same by default. + * Sets the resource for this operator, the minimum and preferred resources are the same by default. * * @param resource The resource for this operator. - * @return The operator with set minimum and maximum resources. + * @return The operator with set minimum and preferred resources. */ + /* public SingleOutputStreamOperator setResource(ResourceSpec resource) { - Preconditions.checkArgument(resource != null && resource.isValid(), "The resources must be not null and values greater than 0."); + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); transformation.setResource(resource, resource); return this; - } + }*/ private boolean canBeParallel() { return !nonParallel; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index f5bb6a5ac9e8b..fcbc607a8e276 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -414,9 +414,9 @@ public void setMaxParallelism(int vertexID, int maxParallelism) { } } - public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec maxResource) { + public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) { if (getStreamNode(vertexID) != null) { - getStreamNode(vertexID).setResource(minResource, maxResource); + getStreamNode(vertexID).setResource(minResource, preferredResource); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index a5019665b59fd..af92421d5f6d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -202,8 +202,8 @@ private Collection transform(StreamTransformation transform) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } - if (transform.getMinResource() != null && transform.getMaxResource() != null) { - streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getMaxResource()); + if (transform.getMinResource() != null && transform.getPreferredResource() != null) { + streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getPreferredResource()); } return transformedIds; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 810fde98e81b1..0bf9adfb82369 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -50,7 +50,7 @@ public class StreamNode implements Serializable { */ private int maxParallelism; private ResourceSpec minResource; - private ResourceSpec maxResource; + private ResourceSpec preferredResource; private Long bufferTimeout = null; private final String operatorName; private String slotSharingGroup; @@ -169,16 +169,16 @@ void setMaxParallelism(int maxParallelism) { } public ResourceSpec getMinResource() { - return minResource != null ? minResource : ResourceSpec.UNKNOWN; + return minResource; } - public ResourceSpec getMaxResource() { - return maxResource != null ? maxResource : ResourceSpec.UNKNOWN; + public ResourceSpec getPreferredResource() { + return preferredResource; } - public void setResource(ResourceSpec minResource, ResourceSpec maxResource) { + public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { this.minResource = minResource; - this.maxResource = maxResource; + this.preferredResource = preferredResource; } public Long getBufferTimeout() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index d8ff14fbc1ad8..1d224549738ec 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -130,13 +130,13 @@ public static int getNewNodeId() { * The minimum resource for this stream transformation. It defines the lower limit for * dynamic resource resize in future plan. */ - private ResourceSpec minResource; + private ResourceSpec minResource = ResourceSpec.UNKNOWN; /** - * The maximum resource for this stream transformation. It defines the upper limit for + * The preferred resource for this stream transformation. It defines the upper limit for * dynamic resource resize in future plan. */ - private ResourceSpec maxResource; + private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; /** * User-specified ID for this transformation. This is used to assign the @@ -227,14 +227,14 @@ public void setMaxParallelism(int maxParallelism) { } /** - * Sets the minimum and maximum resources for this stream transformation. + * Sets the minimum and preferred resources for this stream transformation. * * @param minResource The minimum resource of this transformation. - * @param maxResource The maximum resource of this transformation. + * @param preferredResource The preferred resource of this transformation. */ - public void setResource(ResourceSpec minResource, ResourceSpec maxResource) { + public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { this.minResource = minResource; - this.maxResource = maxResource; + this.preferredResource = preferredResource; } /** @@ -247,12 +247,12 @@ public ResourceSpec getMinResource() { } /** - * Gets the maximum resource of this stream transformation. + * Gets the preferred resource of this stream transformation. * - * @return The maximum resource of this transformation. + * @return The preferred resource of this transformation. */ - public ResourceSpec getMaxResource() { - return maxResource; + public ResourceSpec getPreferredResource() { + return preferredResource; } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index a2b8e79030180..12af1d432a827 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -505,46 +505,47 @@ public void invoke(Long value) throws Exception { /** * Tests whether resource gets set. */ + /* @Test public void testResource() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ResourceSpec minResource1 = new ResourceSpec(1.0, 100); - ResourceSpec maxResource1 = new ResourceSpec(2.0, 200); + ResourceSpec preferredResource1 = new ResourceSpec(2.0, 200); ResourceSpec minResource2 = new ResourceSpec(1.0, 200); - ResourceSpec maxResource2 = new ResourceSpec(2.0, 300); + ResourceSpec preferredResource2 = new ResourceSpec(2.0, 300); ResourceSpec minResource3 = new ResourceSpec(1.0, 300); - ResourceSpec maxResource3 = new ResourceSpec(2.0, 400); + ResourceSpec preferredResource3 = new ResourceSpec(2.0, 400); ResourceSpec minResource4 = new ResourceSpec(1.0, 400); - ResourceSpec maxResource4 = new ResourceSpec(2.0, 500); + ResourceSpec preferredResource4 = new ResourceSpec(2.0, 500); ResourceSpec minResource5 = new ResourceSpec(1.0, 500); - ResourceSpec maxResource5 = new ResourceSpec(2.0, 600); + ResourceSpec preferredResource5 = new ResourceSpec(2.0, 600); ResourceSpec minResource6 = new ResourceSpec(1.0, 600); - ResourceSpec maxResource6 = new ResourceSpec(2.0, 700); + ResourceSpec preferredResource6 = new ResourceSpec(2.0, 700); ResourceSpec minResource7 = new ResourceSpec(1.0, 700); ResourceSpec maxResource7 = new ResourceSpec(2.0, 800); - DataStream source1 = env.generateSequence(0, 0).setResource(minResource1, maxResource1); + DataStream source1 = env.generateSequence(0, 0).setResource(minResource1, preferredResource1); DataStream map1 = source1.map(new MapFunction() { @Override public Long map(Long value) throws Exception { return null; } - }).setResource(minResource2, maxResource2); + }).setResource(minResource2, preferredResource2); - DataStream source2 = env.generateSequence(0, 0).setResource(minResource3, maxResource3); + DataStream source2 = env.generateSequence(0, 0).setResource(minResource3, preferredResource3); DataStream map2 = source2.map(new MapFunction() { @Override public Long map(Long value) throws Exception { return null; } - }).setResource(minResource4, maxResource4); + }).setResource(minResource4, preferredResource4); DataStream connected = map1.connect(map2) .flatMap(new CoFlatMapFunction() { @@ -554,7 +555,7 @@ public void flatMap1(Long value, Collector out) throws Exception { @Override public void flatMap2(Long value, Collector out) throws Exception { } - }).setResource(minResource5, maxResource5); + }).setResource(minResource5, preferredResource5); DataStream windowed = connected .windowAll(GlobalWindows.create()) @@ -566,31 +567,31 @@ public void flatMap2(Long value, Collector out) throws Exception { public Long fold(Long accumulator, Long value) throws Exception { return null; } - }).setResource(minResource6, maxResource6); + }).setResource(minResource6, preferredResource6); DataStreamSink sink = windowed.print().setResource(minResource7, maxResource7); assertEquals(minResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMinResource()); - assertEquals(maxResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMaxResource()); + assertEquals(preferredResource1, env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResource()); assertEquals(minResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMinResource()); - assertEquals(maxResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMaxResource()); + assertEquals(preferredResource2, env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResource()); assertEquals(minResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMinResource()); - assertEquals(maxResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMaxResource()); + assertEquals(preferredResource3, env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResource()); assertEquals(minResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMinResource()); - assertEquals(maxResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMaxResource()); + assertEquals(preferredResource4, env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResource()); assertEquals(minResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMinResource()); - assertEquals(maxResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMaxResource()); + assertEquals(preferredResource5, env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResource()); assertEquals(minResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMinResource()); - assertEquals(maxResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMaxResource()); + assertEquals(preferredResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResource()); assertEquals(minResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResource()); - assertEquals(maxResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMaxResource()); - } + assertEquals(maxResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResource()); + }*/ @Test public void testTypeInfo() { diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index d5a376616aa5d..e42fb3f37a7ed 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -145,42 +145,37 @@ class DataStream[T](stream: JavaStream[T]) { this } - /** * Returns the minimum resource of this operation. */ - def getMinResource: ResourceSpec = stream.getMinResource() + def minResource: ResourceSpec = stream.minResource() /** - * Returns the maximum resource of this operation. + * Returns the preferred resource of this operation. */ - def getMaxResource: ResourceSpec = stream.getMaxResource() + def preferredResource: ResourceSpec = stream.preferredResource() /** - * Sets the minimum and maximum resources of this operation. + * Sets the minimum and preferred resources of this operation. */ - def setResource(minResource: ResourceSpec, maxResource: ResourceSpec): DataStream[T] = { + /* + def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : DataStream[T] = stream match { - case ds: SingleOutputStreamOperator[T] => ds.setResource(minResource, maxResource) + case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setResource( + minResource, preferredResource)) case _ => - throw new UnsupportedOperationException( - "Operator " + stream + " cannot set the resource.") - } - this - } + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") + this + }*/ /** * Sets the resource of this operation. */ - def setResource(resource: ResourceSpec): DataStream[T] = { - stream match { - case ds: SingleOutputStreamOperator[T] => ds.setResource(resource, resource) - case _ => - throw new UnsupportedOperationException( - "Operator " + stream + " cannot set the resource.") - } - this - } + /* + def resource(resource: ResourceSpec) : Unit = { + this.resource(resource, resource) + }*/ /** * Gets the name of the current data stream. This name is diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index fdc50342b66cb..841567a0dcbbe 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -295,67 +295,78 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { /** * Tests whether resource gets set. */ + /* @Test def testResource() { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val minResource1: ResourceSpec = new ResourceSpec(1.0, 100) - val maxResource1: ResourceSpec = new ResourceSpec(2.0, 200) + val preferredResource1: ResourceSpec = new ResourceSpec(2.0, 200) val minResource2: ResourceSpec = new ResourceSpec(1.0, 200) - val maxResource2: ResourceSpec = new ResourceSpec(2.0, 300) + val preferredResource2: ResourceSpec = new ResourceSpec(2.0, 300) val minResource3: ResourceSpec = new ResourceSpec(1.0, 300) - val maxResource3: ResourceSpec = new ResourceSpec(2.0, 400) + val preferredResource3: ResourceSpec = new ResourceSpec(2.0, 400) val minResource4: ResourceSpec = new ResourceSpec(1.0, 400) - val maxResource4: ResourceSpec = new ResourceSpec(2.0, 500) + val preferredResource4: ResourceSpec = new ResourceSpec(2.0, 500) val minResource5: ResourceSpec = new ResourceSpec(1.0, 500) - val maxResource5: ResourceSpec = new ResourceSpec(2.0, 600) + val preferredResource5: ResourceSpec = new ResourceSpec(2.0, 600) val minResource6: ResourceSpec = new ResourceSpec(1.0, 600) - val maxResource6: ResourceSpec = new ResourceSpec(2.0, 700) + val preferredResource6: ResourceSpec = new ResourceSpec(2.0, 700) val minResource7: ResourceSpec = new ResourceSpec(1.0, 700) - val maxResource7: ResourceSpec = new ResourceSpec(2.0, 800) + val preferredResource7: ResourceSpec = new ResourceSpec(2.0, 800) val source1: DataStream[Long] = env.generateSequence(0, 0) - .setResource(minResource1, maxResource1) - val map1: DataStream[String] = source1 - .map(x => "") - .setResource(minResource2, maxResource2) + .resource(minResource1, preferredResource1) + val map1: DataStream[String] = source1.map(x => "") + .resource(minResource2, preferredResource2) val source2: DataStream[Long] = env.generateSequence(0, 0) - .setResource(minResource3, maxResource3) - val map2: DataStream[String] = source2 - .map(x => "") - .setResource(minResource4, maxResource4) + .resource(minResource3, preferredResource3) + val map2: DataStream[String] = source2.map(x => "") + .resource(minResource4, preferredResource4) val connected: DataStream[String] = map1.connect(map2) .flatMap({ (in, out: Collector[(String)]) => }, { (in, out: Collector[(String)]) => }) - .setResource(minResource5, maxResource5) + .resource(minResource5, preferredResource5) val windowed = connected .windowAll(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5))) .fold("")((accumulator: String, value: String) => "") - .setResource(minResource6, maxResource6) + .resource(minResource6, preferredResource6) - var sink = windowed.print().setResource(minResource7, maxResource7) + var sink = windowed.print().resource(minResource7, preferredResource7) val plan = env.getExecutionPlan - assertEquals(minResource1, env.getStreamGraph.getStreamNode(source1.getId).getMinResource) - assertEquals(maxResource1, env.getStreamGraph.getStreamNode(source1.getId).getMaxResource) - assertEquals(minResource2, env.getStreamGraph.getStreamNode(map1.getId).getMinResource) - assertEquals(maxResource2, env.getStreamGraph.getStreamNode(map1.getId).getMaxResource) - assertEquals(minResource3, env.getStreamGraph.getStreamNode(source2.getId).getMinResource) - assertEquals(maxResource3, env.getStreamGraph.getStreamNode(source2.getId).getMaxResource) - assertEquals(minResource4, env.getStreamGraph.getStreamNode(map2.getId).getMinResource) - assertEquals(maxResource4, env.getStreamGraph.getStreamNode(map2.getId).getMaxResource) - assertEquals(minResource5, env.getStreamGraph.getStreamNode(connected.getId).getMinResource) - assertEquals(maxResource5, env.getStreamGraph.getStreamNode(connected.getId).getMaxResource) - assertEquals(minResource6, env.getStreamGraph.getStreamNode(windowed.getId).getMinResource) - assertEquals(maxResource6, env.getStreamGraph.getStreamNode(windowed.getId).getMaxResource) - assertEquals(minResource7, env.getStreamGraph.getStreamNode(sink.getTransformation.getId) - .getMinResource) - assertEquals(maxResource7, env.getStreamGraph.getStreamNode(sink.getTransformation.getId) - .getMaxResource) - } + assertEquals(minResource1, env.getStreamGraph.getStreamNode(source1.getId). + getMinResource) + assertEquals(preferredResource1, env.getStreamGraph.getStreamNode(source1.getId). + getPreferredResource) + assertEquals(minResource2, env.getStreamGraph.getStreamNode(map1.getId). + getMinResource) + assertEquals(preferredResource2, env.getStreamGraph.getStreamNode(map1.getId). + getPreferredResource) + assertEquals(minResource3, env.getStreamGraph.getStreamNode(source2.getId). + getMinResource) + assertEquals(preferredResource3, env.getStreamGraph.getStreamNode(source2.getId). + getPreferredResource) + assertEquals(minResource4, env.getStreamGraph.getStreamNode(map2.getId). + getMinResource) + assertEquals(preferredResource4, env.getStreamGraph.getStreamNode(map2.getId). + getPreferredResource) + assertEquals(minResource5, env.getStreamGraph.getStreamNode(connected.getId). + getMinResource) + assertEquals(preferredResource5, env.getStreamGraph.getStreamNode(connected.getId). + getPreferredResource) + assertEquals(minResource6, env.getStreamGraph.getStreamNode(windowed.getId). + getMinResource) + assertEquals(preferredResource6, env.getStreamGraph.getStreamNode(windowed.getId). + getPreferredResource) + assertEquals(minResource7, env.getStreamGraph.getStreamNode( + sink.getPreferredResource.getId).getMinResource) + assertEquals(preferredResource7, env.getStreamGraph.getStreamNode( + sink.getPreferredResource.getId).getPreferredResource) + }*/ @Test def testTypeInfo() {