Skip to content

Commit

Permalink
Change translateToDataflow to return Operator
Browse files Browse the repository at this point in the history
Before, translateToDataflow of SingleInputOperator could only return
a single input operator of the lower layer, same for TwoInputOperator.

This change allows translateToDataflow to return more kinds of
operators.
  • Loading branch information
aljoscha committed Feb 23, 2015
1 parent 5232c56 commit 86ca692
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
Expand Up @@ -111,7 +111,7 @@ else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet ins
} }




private <I, O> org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) { private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op; SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;
Expand All @@ -121,26 +121,30 @@ else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet ins


Operator<I> input = translate(typedInput); Operator<I> input = translate(typedInput);


org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> dataFlowOp = typedOp.translateToDataFlow(input); org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);


if (op instanceof UdfOperator<?> ) { if (op instanceof UdfOperator<?>) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
SingleInputUdfOperator<I, O, ?> udfOp = (SingleInputUdfOperator<I, O, ?>) op; SingleInputUdfOperator<I, O, ?> udfOp = (SingleInputUdfOperator<I, O, ?>) op;

// set configuration parameters // set configuration parameters
Configuration opParams = udfOp.getParameters(); Configuration opParams = udfOp.getParameters();
if (opParams != null) { if (opParams != null) {
dataFlowOp.getParameters().addAll(opParams); dataFlowOp.getParameters().addAll(opParams);
} }


// set the semantic properties if (dataFlowOp instanceof org.apache.flink.api.common.operators.SingleInputOperator) {
dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties()); org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?> unaryOp =
(org.apache.flink.api.common.operators.SingleInputOperator<?, O, ?>) dataFlowOp;
// set the semantic properties
unaryOp.setSemanticProperties(udfOp.getSemanticProperties());
}
} }


return dataFlowOp; return dataFlowOp;
} }


private <I1, I2, O> org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?> translateTwoInputOperator(TwoInputOperator<?, ?, ?, ?> op) { private <I1, I2, O> org.apache.flink.api.common.operators.Operator<O> translateTwoInputOperator(TwoInputOperator<?, ?, ?, ?> op) {


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
TwoInputOperator<I1, I2, O, ?> typedOp = (TwoInputOperator<I1, I2, O, ?>) op; TwoInputOperator<I1, I2, O, ?> typedOp = (TwoInputOperator<I1, I2, O, ?>) op;
Expand All @@ -153,7 +157,7 @@ else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet ins
Operator<I1> input1 = translate(typedInput1); Operator<I1> input1 = translate(typedInput1);
Operator<I2> input2 = translate(typedInput2); Operator<I2> input2 = translate(typedInput2);


org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?> dataFlowOp = typedOp.translateToDataFlow(input1, input2); org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input1, input2);


if (op instanceof UdfOperator<?> ) { if (op instanceof UdfOperator<?> ) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand All @@ -164,9 +168,13 @@ else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet ins
if (opParams != null) { if (opParams != null) {
dataFlowOp.getParameters().addAll(opParams); dataFlowOp.getParameters().addAll(opParams);
} }


// set the semantic properties if (dataFlowOp instanceof org.apache.flink.api.common.operators.DualInputOperator) {
dataFlowOp.setSemanticProperties(udfOp.getSemanticProperties()); org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?> binaryOp =
(org.apache.flink.api.common.operators.DualInputOperator<?, ?, O, ?>) dataFlowOp;
// set the semantic properties
binaryOp.setSemanticProperties(udfOp.getSemanticProperties());
}
} }


return dataFlowOp; return dataFlowOp;
Expand Down
Expand Up @@ -62,7 +62,7 @@ public TypeInformation<IN> getInputType() {
* @param input The data flow operator that produces this operation's input data. * @param input The data flow operator that produces this operation's input data.
* @return The translated data flow operator. * @return The translated data flow operator.
*/ */
protected abstract org.apache.flink.api.common.operators.SingleInputOperator<?, OUT, ?> translateToDataFlow( protected abstract org.apache.flink.api.common.operators.Operator<OUT> translateToDataFlow(
org.apache.flink.api.common.operators.Operator<IN> input); org.apache.flink.api.common.operators.Operator<IN> input);


} }
Expand Up @@ -87,6 +87,6 @@ public TypeInformation<IN2> getInput2Type() {
* @param input2 The second input of the operation, as a common API operator. * @param input2 The second input of the operation, as a common API operator.
* @return The created common API operator. * @return The created common API operator.
*/ */
protected abstract org.apache.flink.api.common.operators.DualInputOperator<?, ?, OUT, ?> translateToDataFlow( protected abstract org.apache.flink.api.common.operators.Operator<OUT> translateToDataFlow(
org.apache.flink.api.common.operators.Operator<IN1> input1, org.apache.flink.api.common.operators.Operator<IN2> input2); org.apache.flink.api.common.operators.Operator<IN1> input1, org.apache.flink.api.common.operators.Operator<IN2> input2);
} }

0 comments on commit 86ca692

Please sign in to comment.