diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java index a5335780756c9..232f97a5e9577 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java @@ -22,8 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.BoundedMultiInput; -import org.apache.flink.streaming.api.operators.InputSelectable; -import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; @@ -38,7 +36,7 @@ @Internal public class EmbeddedPythonBatchKeyedCoBroadcastProcessOperator extends EmbeddedPythonKeyedCoProcessOperator - implements BoundedMultiInput, InputSelectable { + implements BoundedMultiInput { private static final long serialVersionUID = 1L; @@ -60,15 +58,6 @@ public void endInput(int inputId) throws Exception { } } - @Override - public InputSelection nextSelection() { - if (!isBroadcastSideDone) { - return InputSelection.SECOND; - } else { - return InputSelection.FIRST; - } - } - @Override public void processElement1(StreamRecord element) throws Exception { Preconditions.checkState( diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java index 9184265daec88..c75d16625295c 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java @@ -22,8 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.BoundedMultiInput; -import org.apache.flink.streaming.api.operators.InputSelectable; -import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -38,8 +36,7 @@ */ @Internal public class ExternalPythonBatchKeyedCoBroadcastProcessOperator - extends ExternalPythonKeyedCoProcessOperator - implements BoundedMultiInput, InputSelectable { + extends ExternalPythonKeyedCoProcessOperator implements BoundedMultiInput { private static final long serialVersionUID = 1L; @@ -61,15 +58,6 @@ public void endInput(int inputId) throws Exception { } } - @Override - public InputSelection nextSelection() { - if (!isBroadcastSideDone) { - return InputSelection.SECOND; - } else { - return InputSelection.FIRST; - } - } - @Override public void processElement1(StreamRecord element) throws Exception { Preconditions.checkState( diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java index 9fac56d246fbe..a17373add23bb 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.python.PythonOptions; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonBatchKeyedCoBroadcastProcessOperator; @@ -29,6 +30,7 @@ import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation; import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator; +import org.apache.flink.streaming.runtime.translators.BatchExecutionUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -76,15 +78,24 @@ protected Collection translateForBatchInternal( DelegateOperatorTransformation.configureOperator(transformation, operator); - return translateInternal( - transformation, - transformation.getRegularInput(), - transformation.getBroadcastInput(), - SimpleOperatorFactory.of(operator), - transformation.getStateKeyType(), - transformation.getKeySelector(), - null, - context); + Collection result = + translateInternal( + transformation, + transformation.getRegularInput(), + transformation.getBroadcastInput(), + SimpleOperatorFactory.of(operator), + transformation.getStateKeyType(), + transformation.getKeySelector(), + null, + context); + + BatchExecutionUtils.applyBatchExecutionSettings( + transformation.getId(), + context, + StreamConfig.InputRequirement.SORTED, + StreamConfig.InputRequirement.PASS_THROUGH); + + return result; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java index 0a922ae3088e6..0eaa0b65310d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java @@ -39,10 +39,10 @@ import static org.apache.flink.util.Preconditions.checkState; /** A utility class for applying sorting inputs. */ -class BatchExecutionUtils { +public class BatchExecutionUtils { private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionUtils.class); - static void applyBatchExecutionSettings( + public static void applyBatchExecutionSettings( int transformationId, TransformationTranslator.Context context, StreamConfig.InputRequirement... inputRequirements) {