Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> {

private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use

private ResourceSpec minResource; // the minimum resource of the contract instance.

private ResourceSpec preferredResource; // the preferred resource of the contract instance.

/**
* The return type of the user function.
*/
Expand Down Expand Up @@ -184,6 +188,38 @@ public int getParallelism() {
public void setParallelism(int parallelism) {
this.parallelism = parallelism;
}

/**
* Gets the minimum resource for this contract instance. The minimum resource denotes how many
* resources will be needed in the minimum for the user function during the execution.
*
* @return The minimum resource of this operator.
*/
public ResourceSpec getMinResource() {
return this.minResource;
}

/**
* Gets the preferred resource for this contract instance. The preferred resource denotes how many
* resources will be needed in the maximum for the user function during the execution.
*
* @return The preferred resource of this operator.
*/
public ResourceSpec getPreferredResource() {
return this.preferredResource;
}

/**
* Sets the minimum and preferred resources for this contract instance. The resource denotes
* how many memories and cpu cores of the user function will be consumed during the execution.
*
* @param minResource The minimum resource of this operator.
* @param preferredResource The preferred resource of this operator.
*/
public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
this.minResource = minResource;
this.preferredResource = preferredResource;
}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -51,6 +52,10 @@ public class DataSink<T> {

private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

private ResourceSpec minResource = ResourceSpec.UNKNOWN;

private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;

private Configuration parameters;

private int[] sortKeyPositions;
Expand Down Expand Up @@ -278,4 +283,63 @@ public DataSink<T> setParallelism(int parallelism) {

return this;
}

/**
* Returns the minimum resource of this data sink. If no minimum resource has been set,
* it returns the default empty resource.
*
* @return The minimum resource of this data sink.
*/
public ResourceSpec getMinResource() {
return this.minResource;
}

/**
* Returns the preferred resource of this data sink. If no preferred resource has been set,
* it returns the default empty resource.
*
* @return The preferred resource of this data sink.
*/
public ResourceSpec getPreferredResource() {
return this.preferredResource;
}

/**
* Sets the minimum and preferred resources for this data sink. This overrides the default empty resource.
* The minimum resource must be satisfied and the preferred resource specifies the upper bound
* for dynamic resource resize.
*
* @param minResource The minimum resource for this data sink.
* @param preferredResource The preferred resource for this data sink.
* @return The data sink with set minimum and preferred resources.
*/
/*
public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
Preconditions.checkNotNull(minResource != null && preferredResource != null,
"The min and preferred resources must be not null.");
Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");

this.minResource = minResource;
this.preferredResource = preferredResource;

return this;
}*/

/**
* Sets the resource for this data sink. This overrides the default empty minimum and preferred resources.
*
* @param resource The resource for this data sink.
* @return The data sink with set minimum and preferred resources.
*/
/*
public DataSink<T> setResource(ResourceSpec resource) {
Preconditions.checkNotNull(resource != null, "The resource must be not null.");
Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");

this.minResource = resource;
this.preferredResource = resource;

return this;
}*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -62,10 +63,13 @@ public class DeltaIteration<ST, WT> {
private String name;

private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

private ResourceSpec minResource = ResourceSpec.UNKNOWN;

private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;

private boolean solutionSetUnManaged;


public DeltaIteration(ExecutionEnvironment context, TypeInformation<ST> type, DataSet<ST> solutionSet, DataSet<WT> workset, Keys<ST> keys, int maxIterations) {
initialSolutionSet = solutionSet;
initialWorkset = workset;
Expand Down Expand Up @@ -192,6 +196,65 @@ public DeltaIteration<ST, WT> parallelism(int parallelism) {
public int getParallelism() {
return parallelism;
}

/**
* Sets the minimum and preferred resources for the iteration. This overrides the default empty resource.
* The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
*
* @param minResource The minimum resource for the iteration.
* @param preferredResource The preferred resource for the iteration.
* @return The iteration with set minimum and preferred resources.
*/
/*
public DeltaIteration<ST, WT> setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
Preconditions.checkNotNull(minResource != null && preferredResource != null,
"The min and preferred resources must be not null.");
Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");

this.minResource = minResource;
this.preferredResource = preferredResource;

return this;
}*/

/**
* Sets the resource for the iteration, and the minimum and preferred resources are the same by default.
* The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
*
* @param resource The resource for the iteration.
* @return The iteration with set minimum and preferred resources.
*/
/*
public DeltaIteration<ST, WT> setResource(ResourceSpec resource) {
Preconditions.checkNotNull(resource != null, "The resource must be not null.");
Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0.");

this.minResource = resource;
this.preferredResource = resource;

return this;
}*/

/**
* Gets the minimum resource from this iteration. If no minimum resource has been set,
* it returns the default empty resource.
*
* @return The minimum resource of the iteration.
*/
public ResourceSpec getMinResource() {
return this.minResource;
}

/**
* Gets the preferred resource from this iteration. If no preferred resource has been set,
* it returns the default empty resource.
*
* @return The preferred resource of the iteration.
*/
public ResourceSpec getPreferredResource() {
return this.preferredResource;
}

/**
* Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -38,6 +39,10 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<

protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

protected ResourceSpec minResource = ResourceSpec.UNKNOWN;

protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN;

protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
super(context, resultType);
}
Expand Down Expand Up @@ -70,6 +75,26 @@ public int getParallelism() {
return this.parallelism;
}

/**
* Returns the minimum resource of this operator. If no minimum resource has been set,
* it returns the default empty resource.
*
* @return The minimum resource of this operator.
*/
public ResourceSpec minResource() {
return this.minResource;
}

/**
* Returns the preferred resource of this operator. If no preferred resource has been set,
* it returns the default empty resource.
*
* @return The preferred resource of this operator.
*/
public ResourceSpec preferredResource() {
return this.preferredResource;
}

/**
* Sets the name of this operator. This overrides the default name, which is either
* a generated description of the operation (such as for example "Aggregate(1:SUM, 2:MIN)")
Expand Down Expand Up @@ -103,4 +128,46 @@ public O setParallelism(int parallelism) {
O returnType = (O) this;
return returnType;
}

/**
* Sets the minimum and preferred resources for this operator. This overrides the default empty resource.
* The lower and upper resource limits will be considered in dynamic resource resize feature for future plan.
*
* @param minResource The minimum resource for this operator.
* @param preferredResource The preferred resource for this operator.
* @return The operator with set minimum and preferred resources.
*/
/*
public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) {
Preconditions.checkNotNull(minResource != null && preferredResource != null,
"The min and preferred resources must be not null.");
Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
"The values in resource must be not less than 0 and the preferred resource must be greater than the min resource.");

this.minResource = minResource;
this.preferredResource = preferredResource;

@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
}*/

/**
* Sets the resource for this operator. This overrides the default empty minimum and preferred resources.
*
* @param resource The resource for this operator.
* @return The operator with set minimum and preferred resources.
*/
/*
public O setResource(ResourceSpec resource) {
Preconditions.checkNotNull(resource != null, "The resource must be not null.");
Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0.");

this.minResource = resource;
this.preferredResource = resource;

@SuppressWarnings("unchecked")
O returnType = (O) this;
return returnType;
}*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {

// translate the sink itself and connect it to the input
GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);


translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource());

return translatedSink;
}

Expand Down Expand Up @@ -91,19 +93,31 @@ private <T> Operator<T> translate(DataSet<T> dataSet) {
Operator<T> dataFlowOp;

if (dataSet instanceof DataSource) {
dataFlowOp = ((DataSource<T>) dataSet).translateToDataFlow();
DataSource<T> dataSource = (DataSource<T>) dataSet;
dataFlowOp = dataSource.translateToDataFlow();
dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource());
}
else if (dataSet instanceof SingleInputOperator) {
dataFlowOp = translateSingleInputOperator((SingleInputOperator<?, ?, ?>) dataSet);
SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
dataFlowOp = translateSingleInputOperator(singleInputOperator);
dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource());
}
else if (dataSet instanceof TwoInputOperator) {
dataFlowOp = translateTwoInputOperator((TwoInputOperator<?, ?, ?, ?>) dataSet);
TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
dataFlowOp = translateTwoInputOperator(twoInputOperator);
dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource());
}
else if (dataSet instanceof BulkIterationResultSet) {
dataFlowOp = translateBulkIteration((BulkIterationResultSet<?>) dataSet);
BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
dataFlowOp = translateBulkIteration(bulkIterationResultSet);
dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(),
bulkIterationResultSet.getIterationHead().preferredResource());
}
else if (dataSet instanceof DeltaIterationResultSet) {
dataFlowOp = translateDeltaIteration((DeltaIterationResultSet<?, ?>) dataSet);
DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(),
deltaIterationResultSet.getIterationHead().getPreferredResource());
}
else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.java.operator;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
Expand All @@ -45,6 +46,20 @@ public void testConfigurationOfParallelism() {
assertEquals(parallelism, operator.getParallelism());
}

/*
@Test
public void testConfigurationOfResource() {
Operator operator = new MockOperator();

// verify explicit change in resource
ResourceSpec minResource = new ResourceSpec(1.0, 100, 0, 0, 0);
ResourceSpec preferredResource = new ResourceSpec(2.0, 200, 0, 0, 0);
operator.setResource(minResource, preferredResource);

assertEquals(minResource, operator.getMinResource());
assertEquals(preferredResource, operator.getPreferredResource());
}*/

private class MockOperator extends Operator {
public MockOperator() {
super(ExecutionEnvironment.createCollectionsEnvironment(), ValueTypeInfo.NULL_VALUE_TYPE_INFO);
Expand Down
Loading