diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index 7e70fd758a28f..a9dedfa6930d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -45,6 +45,10 @@ public abstract class Operator implements Visitable> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use + private ResourceSpec minResource; // the minimum resource of the contract instance. + + private ResourceSpec preferredResource; // the preferred resource of the contract instance. + /** * The return type of the user function. */ @@ -184,6 +188,38 @@ public int getParallelism() { public void setParallelism(int parallelism) { this.parallelism = parallelism; } + + /** + * Gets the minimum resource for this contract instance. The minimum resource denotes how many + * resources will be needed in the minimum for the user function during the execution. + * + * @return The minimum resource of this operator. + */ + public ResourceSpec getMinResource() { + return this.minResource; + } + + /** + * Gets the preferred resource for this contract instance. The preferred resource denotes how many + * resources will be needed in the maximum for the user function during the execution. + * + * @return The preferred resource of this operator. + */ + public ResourceSpec getPreferredResource() { + return this.preferredResource; + } + + /** + * Sets the minimum and preferred resources for this contract instance. The resource denotes + * how many memories and cpu cores of the user function will be consumed during the execution. + * + * @param minResource The minimum resource of this operator. + * @param preferredResource The preferred resource of this operator. + */ + public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + this.minResource = minResource; + this.preferredResource = preferredResource; + } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 8b419d96c87d2..3be9cc0cbb928 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.typeinfo.NothingTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -51,6 +52,10 @@ public class DataSink { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private ResourceSpec minResource = ResourceSpec.UNKNOWN; + + private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; + private Configuration parameters; private int[] sortKeyPositions; @@ -278,4 +283,63 @@ public DataSink setParallelism(int parallelism) { return this; } + + /** + * Returns the minimum resource of this data sink. If no minimum resource has been set, + * it returns the default empty resource. + * + * @return The minimum resource of this data sink. + */ + public ResourceSpec getMinResource() { + return this.minResource; + } + + /** + * Returns the preferred resource of this data sink. If no preferred resource has been set, + * it returns the default empty resource. + * + * @return The preferred resource of this data sink. + */ + public ResourceSpec getPreferredResource() { + return this.preferredResource; + } + + /** + * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource. + * The minimum resource must be satisfied and the preferred resource specifies the upper bound + * for dynamic resource resize. + * + * @param minResource The minimum resource for this data sink. + * @param preferredResource The preferred resource for this data sink. + * @return The data sink with set minimum and preferred resources. + */ + /* + public DataSink setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); + + this.minResource = minResource; + this.preferredResource = preferredResource; + + return this; + }*/ + + /** + * Sets the resource for this data sink. This overrides the default empty minimum and preferred resources. + * + * @param resource The resource for this data sink. + * @return The data sink with set minimum and preferred resources. + */ + /* + public DataSink setResource(ResourceSpec resource) { + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); + + this.minResource = resource; + this.preferredResource = resource; + + return this; + }*/ } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index b97a9de35f6e4..cf0a63e576c46 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.aggregators.AggregatorRegistry; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -62,10 +63,13 @@ public class DeltaIteration { private String name; private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + + private ResourceSpec minResource = ResourceSpec.UNKNOWN; + + private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; private boolean solutionSetUnManaged; - public DeltaIteration(ExecutionEnvironment context, TypeInformation type, DataSet solutionSet, DataSet workset, Keys keys, int maxIterations) { initialSolutionSet = solutionSet; initialWorkset = workset; @@ -192,6 +196,65 @@ public DeltaIteration parallelism(int parallelism) { public int getParallelism() { return parallelism; } + + /** + * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param minResource The minimum resource for the iteration. + * @param preferredResource The preferred resource for the iteration. + * @return The iteration with set minimum and preferred resources. + */ + /* + public DeltaIteration setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); + + this.minResource = minResource; + this.preferredResource = preferredResource; + + return this; + }*/ + + /** + * Sets the resource for the iteration, and the minimum and preferred resources are the same by default. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param resource The resource for the iteration. + * @return The iteration with set minimum and preferred resources. + */ + /* + public DeltaIteration setResource(ResourceSpec resource) { + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0."); + + this.minResource = resource; + this.preferredResource = resource; + + return this; + }*/ + + /** + * Gets the minimum resource from this iteration. If no minimum resource has been set, + * it returns the default empty resource. + * + * @return The minimum resource of the iteration. + */ + public ResourceSpec getMinResource() { + return this.minResource; + } + + /** + * Gets the preferred resource from this iteration. If no preferred resource has been set, + * it returns the default empty resource. + * + * @return The preferred resource of the iteration. + */ + public ResourceSpec getPreferredResource() { + return this.preferredResource; + } /** * Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java index 323d23eb3a49e..79cae14342469 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -38,6 +39,10 @@ public abstract class Operator> extends DataSet< protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + protected ResourceSpec minResource = ResourceSpec.UNKNOWN; + + protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN; + protected Operator(ExecutionEnvironment context, TypeInformation resultType) { super(context, resultType); } @@ -70,6 +75,26 @@ public int getParallelism() { return this.parallelism; } + /** + * Returns the minimum resource of this operator. If no minimum resource has been set, + * it returns the default empty resource. + * + * @return The minimum resource of this operator. + */ + public ResourceSpec minResource() { + return this.minResource; + } + + /** + * Returns the preferred resource of this operator. If no preferred resource has been set, + * it returns the default empty resource. + * + * @return The preferred resource of this operator. + */ + public ResourceSpec preferredResource() { + return this.preferredResource; + } + /** * Sets the name of this operator. This overrides the default name, which is either * a generated description of the operation (such as for example "Aggregate(1:SUM, 2:MIN)") @@ -103,4 +128,46 @@ public O setParallelism(int parallelism) { O returnType = (O) this; return returnType; } + + /** + * Sets the minimum and preferred resources for this operator. This overrides the default empty resource. + * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. + * + * @param minResource The minimum resource for this operator. + * @param preferredResource The preferred resource for this operator. + * @return The operator with set minimum and preferred resources. + */ + /* + public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); + + this.minResource = minResource; + this.preferredResource = preferredResource; + + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + }*/ + + /** + * Sets the resource for this operator. This overrides the default empty minimum and preferred resources. + * + * @param resource The resource for this operator. + * @return The operator with set minimum and preferred resources. + */ + /* + public O setResource(ResourceSpec resource) { + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); + + this.minResource = resource; + this.preferredResource = resource; + + @SuppressWarnings("unchecked") + O returnType = (O) this; + return returnType; + }*/ } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 88c9c37e0e77a..909cd32c092d1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -63,7 +63,9 @@ private GenericDataSinkBase translate(DataSink sink) { // translate the sink itself and connect it to the input GenericDataSinkBase translatedSink = sink.translateToDataFlow(input); - + + translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource()); + return translatedSink; } @@ -91,19 +93,31 @@ private Operator translate(DataSet dataSet) { Operator dataFlowOp; if (dataSet instanceof DataSource) { - dataFlowOp = ((DataSource) dataSet).translateToDataFlow(); + DataSource dataSource = (DataSource) dataSet; + dataFlowOp = dataSource.translateToDataFlow(); + dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource()); } else if (dataSet instanceof SingleInputOperator) { - dataFlowOp = translateSingleInputOperator((SingleInputOperator) dataSet); + SingleInputOperator singleInputOperator = (SingleInputOperator) dataSet; + dataFlowOp = translateSingleInputOperator(singleInputOperator); + dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource()); } else if (dataSet instanceof TwoInputOperator) { - dataFlowOp = translateTwoInputOperator((TwoInputOperator) dataSet); + TwoInputOperator twoInputOperator = (TwoInputOperator) dataSet; + dataFlowOp = translateTwoInputOperator(twoInputOperator); + dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource()); } else if (dataSet instanceof BulkIterationResultSet) { - dataFlowOp = translateBulkIteration((BulkIterationResultSet) dataSet); + BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet) dataSet; + dataFlowOp = translateBulkIteration(bulkIterationResultSet); + dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(), + bulkIterationResultSet.getIterationHead().preferredResource()); } else if (dataSet instanceof DeltaIterationResultSet) { - dataFlowOp = translateDeltaIteration((DeltaIterationResultSet) dataSet); + DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet) dataSet; + dataFlowOp = translateDeltaIteration(deltaIterationResultSet); + dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(), + deltaIterationResultSet.getIterationHead().getPreferredResource()); } else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) { throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action." diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java index a69ca3c219060..992acc9433519 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.operator; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.Operator; import org.apache.flink.api.java.typeutils.ValueTypeInfo; @@ -45,6 +46,20 @@ public void testConfigurationOfParallelism() { assertEquals(parallelism, operator.getParallelism()); } + /* + @Test + public void testConfigurationOfResource() { + Operator operator = new MockOperator(); + + // verify explicit change in resource + ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0); + ResourceSpec preferredResource = new ResourceSpec(2.0, 200, 0, 0, 0); + operator.setResource(minResource, preferredResource); + + assertEquals(minResource, operator.getMinResource()); + assertEquals(preferredResource, operator.getPreferredResource()); + }*/ + private class MockOperator extends Operator { public MockOperator() { super(ExecutionEnvironment.createCollectionsEnvironment(), ValueTypeInfo.NULL_VALUE_TYPE_INFO); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java index b30fa36ba026e..4ef91b328394b 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java @@ -19,6 +19,7 @@ package org.apache.flink.optimizer.plan; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.costs.Costs; @@ -308,6 +309,14 @@ public void setParallelism(int parallelism) { public int getParallelism() { return this.parallelism; } + + public ResourceSpec getMinResource() { + return this.template.getOperator().getMinResource(); + } + + public ResourceSpec getPreferredResource() { + return this.template.getOperator().getPreferredResource(); + } public long getGuaranteedAvailableMemory() { return this.template.getMinimalMemoryAcrossAllSubTasks(); diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 4e7be042901ff..5cfb6018b4bcb 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.accumulators.SerializedListAccumulator import org.apache.flink.api.common.aggregators.Aggregator import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat} -import org.apache.flink.api.common.operators.{Keys, Order} +import org.apache.flink.api.common.operators.{ResourceSpec, Keys, Order} import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod @@ -177,6 +177,53 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { "parallelism.") } + /** + * Sets the minimum and preferred resources of this operation. + */ + /* + def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : Unit = { + javaSet match { + case ds: DataSource[_] => ds.setResource(minResource, preferredResource) + case op: Operator[_, _] => op.setResource(minResource, preferredResource) + case di: DeltaIterationResultSet[_, _] => + di.getIterationHead.setResource(minResource, preferredResource) + case _ => + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") + } + this + }*/ + + /** + * Sets the resource of this operation. + */ + /* + def resource(resource: ResourceSpec) : Unit = { + this.resource(resource, resource) + }*/ + + /** + * Returns the minimum resource of this operation. + */ + def minResource: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.minResource() + case op: Operator[_, _] => op.minResource + case _ => + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") + } + + /** + * Returns the preferred resource of this operation. + */ + def preferredResource: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.preferredResource() + case op: Operator[_, _] => op.preferredResource + case _ => + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") + } + /** * Registers an [[org.apache.flink.api.common.aggregators.Aggregator]] * for the iteration. Aggregators can be used to maintain simple statistics during the diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 204557db0100a..ae1c39a09b920 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -142,6 +143,24 @@ public int getParallelism() { return transformation.getParallelism(); } + /** + * Gets the minimum resource for this operator. + * + * @return The minimum resource set for this operator. + */ + public ResourceSpec minResource() { + return transformation.getMinResource(); + } + + /** + * Gets the preferred resource for this operator. + * + * @return The preferred resource set for this operator. + */ + public ResourceSpec preferredResource() { + return transformation.getPreferredResource(); + } + /** * Gets the type of the stream. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 0c9378bffb0ae..69e21d6d18fad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -113,6 +113,42 @@ public DataStreamSink setParallelism(int parallelism) { return this; } + /** + * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will + * be considered in resource resize feature for future plan. + * + * @param minResource The minimum resource for this sink. + * @param preferredResource The preferred resource for this sink + * @return The sink with set minimum and preferred resources. + */ + /* + public DataStreamSink setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkNotNull(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); + + transformation.setResource(minResource, preferredResource); + + return this; + }*/ + + /** + * Sets the resource for this sink, the minimum and preferred resources are the same by default. + * + * @param resource The resource for this sink. + * @return The sink with set minimum and preferred resources. + */ + /* + public DataStreamSink setResource(ResourceSpec resource) { + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); + + transformation.setResource(resource, resource); + + return this; + }*/ + /** * Turns off chaining for this operator so thread co-location will not be * used as an optimization. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 9dd60b708b258..d856603b6f72d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -154,6 +154,42 @@ public SingleOutputStreamOperator setMaxParallelism(int maxParallelism) { return this; } + /** + * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will + * be considered in dynamic resource resize feature for future plan. + * + * @param minResource The minimum resource for this operator. + * @param preferredResource The preferred resource for this operator. + * @return The operator with set minimum and preferred resources. + */ + /* + public SingleOutputStreamOperator setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + Preconditions.checkArgument(minResource != null && preferredResource != null, + "The min and preferred resources must be not null."); + Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), + "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); + + transformation.setResource(minResource, preferredResource); + + return this; + }*/ + + /** + * Sets the resource for this operator, the minimum and preferred resources are the same by default. + * + * @param resource The resource for this operator. + * @return The operator with set minimum and preferred resources. + */ + /* + public SingleOutputStreamOperator setResource(ResourceSpec resource) { + Preconditions.checkNotNull(resource != null, "The resource must be not null."); + Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); + + transformation.setResource(resource, resource); + + return this; + }*/ + private boolean canBeParallel() { return !nonParallel; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 696c04be4b1b3..fcbc607a8e276 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -32,6 +32,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -413,6 +414,12 @@ public void setMaxParallelism(int vertexID, int maxParallelism) { } } + public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) { + if (getStreamNode(vertexID) != null) { + getStreamNode(vertexID).setResource(minResource, preferredResource); + } + } + public void setOneInputStateKey(Integer vertexID, KeySelector keySelector, TypeSerializer keySerializer) { StreamNode node = getStreamNode(vertexID); node.setStatePartitioner1(keySelector); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index ddd05154d1006..af92421d5f6d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -202,6 +202,10 @@ private Collection transform(StreamTransformation transform) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } + if (transform.getMinResource() != null && transform.getPreferredResource() != null) { + streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getPreferredResource()); + } + return transformedIds; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 19a369931c325..0bf9adfb82369 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -48,6 +49,8 @@ public class StreamNode implements Serializable { * dynamic scaling and the number of key groups used for partitioned state. */ private int maxParallelism; + private ResourceSpec minResource; + private ResourceSpec preferredResource; private Long bufferTimeout = null; private final String operatorName; private String slotSharingGroup; @@ -165,6 +168,19 @@ void setMaxParallelism(int maxParallelism) { this.maxParallelism = maxParallelism; } + public ResourceSpec getMinResource() { + return minResource; + } + + public ResourceSpec getPreferredResource() { + return preferredResource; + } + + public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + this.minResource = minResource; + this.preferredResource = preferredResource; + } + public Long getBufferTimeout() { return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index 5e1b3e27aa145..1d224549738ec 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -125,6 +126,18 @@ public static int getNewNodeId() { */ private int maxParallelism = -1; + /** + * The minimum resource for this stream transformation. It defines the lower limit for + * dynamic resource resize in future plan. + */ + private ResourceSpec minResource = ResourceSpec.UNKNOWN; + + /** + * The preferred resource for this stream transformation. It defines the upper limit for + * dynamic resource resize in future plan. + */ + private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; + /** * User-specified ID for this transformation. This is used to assign the * same operator ID across job restarts. There is also the automatically @@ -213,6 +226,35 @@ public void setMaxParallelism(int maxParallelism) { this.maxParallelism = maxParallelism; } + /** + * Sets the minimum and preferred resources for this stream transformation. + * + * @param minResource The minimum resource of this transformation. + * @param preferredResource The preferred resource of this transformation. + */ + public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { + this.minResource = minResource; + this.preferredResource = preferredResource; + } + + /** + * Gets the minimum resource of this stream transformation. + * + * @return The minimum resource of this transformation. + */ + public ResourceSpec getMinResource() { + return minResource; + } + + /** + * Gets the preferred resource of this stream transformation. + * + * @return The preferred resource of this transformation. + */ + public ResourceSpec getPreferredResource() { + return preferredResource; + } + /** * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID. *

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index eaac6b8cc9fac..12af1d432a827 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -501,6 +502,97 @@ public void invoke(Long value) throws Exception { assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism()); } + /** + * Tests whether resource gets set. + */ + /* + @Test + public void testResource() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + ResourceSpec minResource1 = new ResourceSpec(1.0, 100); + ResourceSpec preferredResource1 = new ResourceSpec(2.0, 200); + + ResourceSpec minResource2 = new ResourceSpec(1.0, 200); + ResourceSpec preferredResource2 = new ResourceSpec(2.0, 300); + + ResourceSpec minResource3 = new ResourceSpec(1.0, 300); + ResourceSpec preferredResource3 = new ResourceSpec(2.0, 400); + + ResourceSpec minResource4 = new ResourceSpec(1.0, 400); + ResourceSpec preferredResource4 = new ResourceSpec(2.0, 500); + + ResourceSpec minResource5 = new ResourceSpec(1.0, 500); + ResourceSpec preferredResource5 = new ResourceSpec(2.0, 600); + + ResourceSpec minResource6 = new ResourceSpec(1.0, 600); + ResourceSpec preferredResource6 = new ResourceSpec(2.0, 700); + + ResourceSpec minResource7 = new ResourceSpec(1.0, 700); + ResourceSpec maxResource7 = new ResourceSpec(2.0, 800); + + DataStream source1 = env.generateSequence(0, 0).setResource(minResource1, preferredResource1); + DataStream map1 = source1.map(new MapFunction() { + @Override + public Long map(Long value) throws Exception { + return null; + } + }).setResource(minResource2, preferredResource2); + + DataStream source2 = env.generateSequence(0, 0).setResource(minResource3, preferredResource3); + DataStream map2 = source2.map(new MapFunction() { + @Override + public Long map(Long value) throws Exception { + return null; + } + }).setResource(minResource4, preferredResource4); + + DataStream connected = map1.connect(map2) + .flatMap(new CoFlatMapFunction() { + @Override + public void flatMap1(Long value, Collector out) throws Exception { + } + @Override + public void flatMap2(Long value, Collector out) throws Exception { + } + }).setResource(minResource5, preferredResource5); + + DataStream windowed = connected + .windowAll(GlobalWindows.create()) + .trigger(PurgingTrigger.of(CountTrigger.of(10))) + .fold(0L, new FoldFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Long fold(Long accumulator, Long value) throws Exception { + return null; + } + }).setResource(minResource6, preferredResource6); + + DataStreamSink sink = windowed.print().setResource(minResource7, maxResource7); + + assertEquals(minResource1, env.getStreamGraph().getStreamNode(source1.getId()).getMinResource()); + assertEquals(preferredResource1, env.getStreamGraph().getStreamNode(source1.getId()).getPreferredResource()); + + assertEquals(minResource2, env.getStreamGraph().getStreamNode(map1.getId()).getMinResource()); + assertEquals(preferredResource2, env.getStreamGraph().getStreamNode(map1.getId()).getPreferredResource()); + + assertEquals(minResource3, env.getStreamGraph().getStreamNode(source2.getId()).getMinResource()); + assertEquals(preferredResource3, env.getStreamGraph().getStreamNode(source2.getId()).getPreferredResource()); + + assertEquals(minResource4, env.getStreamGraph().getStreamNode(map2.getId()).getMinResource()); + assertEquals(preferredResource4, env.getStreamGraph().getStreamNode(map2.getId()).getPreferredResource()); + + assertEquals(minResource5, env.getStreamGraph().getStreamNode(connected.getId()).getMinResource()); + assertEquals(preferredResource5, env.getStreamGraph().getStreamNode(connected.getId()).getPreferredResource()); + + assertEquals(minResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getMinResource()); + assertEquals(preferredResource6, env.getStreamGraph().getStreamNode(windowed.getId()).getPreferredResource()); + + assertEquals(minResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getMinResource()); + assertEquals(maxResource7, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getPreferredResource()); + }*/ + @Test public void testTypeInfo() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index ba92f863d9cf9..e42fb3f37a7ed 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -22,6 +22,7 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} import org.apache.flink.api.common.io.OutputFormat +import org.apache.flink.api.common.operators.ResourceSpec import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.tuple.{Tuple => JavaTuple} @@ -144,6 +145,38 @@ class DataStream[T](stream: JavaStream[T]) { this } + /** + * Returns the minimum resource of this operation. + */ + def minResource: ResourceSpec = stream.minResource() + + /** + * Returns the preferred resource of this operation. + */ + def preferredResource: ResourceSpec = stream.preferredResource() + + /** + * Sets the minimum and preferred resources of this operation. + */ + /* + def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : DataStream[T] = + stream match { + case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setResource( + minResource, preferredResource)) + case _ => + throw new UnsupportedOperationException("Operator does not support " + + "configuring custom resources specs.") + this + }*/ + + /** + * Sets the resource of this operation. + */ + /* + def resource(resource: ResourceSpec) : Unit = { + this.resource(resource, resource) + }*/ + /** * Gets the name of the current data stream. This name is * used by the visualization and logging during runtime. diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index adb59f2f1bf42..841567a0dcbbe 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala import java.lang import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.operators.ResourceSpec import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.functions.ProcessFunction @@ -34,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.streaming.runtime.partitioner._ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.util.Collector -import org.junit.Assert.fail +import org.junit.Assert._ import org.junit.Test class DataStreamTest extends StreamingMultipleProgramsTestBase { @@ -291,6 +292,82 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { assert(4 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) } + /** + * Tests whether resource gets set. + */ + /* + @Test + def testResource() { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val minResource1: ResourceSpec = new ResourceSpec(1.0, 100) + val preferredResource1: ResourceSpec = new ResourceSpec(2.0, 200) + val minResource2: ResourceSpec = new ResourceSpec(1.0, 200) + val preferredResource2: ResourceSpec = new ResourceSpec(2.0, 300) + val minResource3: ResourceSpec = new ResourceSpec(1.0, 300) + val preferredResource3: ResourceSpec = new ResourceSpec(2.0, 400) + val minResource4: ResourceSpec = new ResourceSpec(1.0, 400) + val preferredResource4: ResourceSpec = new ResourceSpec(2.0, 500) + val minResource5: ResourceSpec = new ResourceSpec(1.0, 500) + val preferredResource5: ResourceSpec = new ResourceSpec(2.0, 600) + val minResource6: ResourceSpec = new ResourceSpec(1.0, 600) + val preferredResource6: ResourceSpec = new ResourceSpec(2.0, 700) + val minResource7: ResourceSpec = new ResourceSpec(1.0, 700) + val preferredResource7: ResourceSpec = new ResourceSpec(2.0, 800) + + val source1: DataStream[Long] = env.generateSequence(0, 0) + .resource(minResource1, preferredResource1) + val map1: DataStream[String] = source1.map(x => "") + .resource(minResource2, preferredResource2) + val source2: DataStream[Long] = env.generateSequence(0, 0) + .resource(minResource3, preferredResource3) + val map2: DataStream[String] = source2.map(x => "") + .resource(minResource4, preferredResource4) + + val connected: DataStream[String] = map1.connect(map2) + .flatMap({ (in, out: Collector[(String)]) => }, { (in, out: Collector[(String)]) => }) + .resource(minResource5, preferredResource5) + + val windowed = connected + .windowAll(GlobalWindows.create()) + .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5))) + .fold("")((accumulator: String, value: String) => "") + .resource(minResource6, preferredResource6) + + var sink = windowed.print().resource(minResource7, preferredResource7) + + val plan = env.getExecutionPlan + + assertEquals(minResource1, env.getStreamGraph.getStreamNode(source1.getId). + getMinResource) + assertEquals(preferredResource1, env.getStreamGraph.getStreamNode(source1.getId). + getPreferredResource) + assertEquals(minResource2, env.getStreamGraph.getStreamNode(map1.getId). + getMinResource) + assertEquals(preferredResource2, env.getStreamGraph.getStreamNode(map1.getId). + getPreferredResource) + assertEquals(minResource3, env.getStreamGraph.getStreamNode(source2.getId). + getMinResource) + assertEquals(preferredResource3, env.getStreamGraph.getStreamNode(source2.getId). + getPreferredResource) + assertEquals(minResource4, env.getStreamGraph.getStreamNode(map2.getId). + getMinResource) + assertEquals(preferredResource4, env.getStreamGraph.getStreamNode(map2.getId). + getPreferredResource) + assertEquals(minResource5, env.getStreamGraph.getStreamNode(connected.getId). + getMinResource) + assertEquals(preferredResource5, env.getStreamGraph.getStreamNode(connected.getId). + getPreferredResource) + assertEquals(minResource6, env.getStreamGraph.getStreamNode(windowed.getId). + getMinResource) + assertEquals(preferredResource6, env.getStreamGraph.getStreamNode(windowed.getId). + getPreferredResource) + assertEquals(minResource7, env.getStreamGraph.getStreamNode( + sink.getPreferredResource.getId).getMinResource) + assertEquals(preferredResource7, env.getStreamGraph.getStreamNode( + sink.getPreferredResource.getId).getPreferredResource) + }*/ + @Test def testTypeInfo() { val env = StreamExecutionEnvironment.getExecutionEnvironment