From 192829523dd2754f8302af98a081212d81cc8ef1 Mon Sep 17 00:00:00 2001 From: Juntao Hu Date: Mon, 6 Mar 2023 20:48:56 +0800 Subject: [PATCH 1/2] [FLINK-31337][Python] Fix python keyed broadcast batch operator input not sorted --- .../flink/python/util/PythonConfigUtil.java | 4 +-- ...nBatchKeyedCoBroadcastProcessOperator.java | 13 +------- ...nBatchKeyedCoBroadcastProcessOperator.java | 14 +-------- .../PythonBroadcastStateTransformation.java | 4 +-- ...thonKeyedBroadcastStateTransformation.java | 3 +- ...roadcastStateTransformationTranslator.java | 3 +- ...roadcastStateTransformationTranslator.java | 31 ++++++++++++------- 7 files changed, 29 insertions(+), 43 deletions(-) rename flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/{python => }/PythonBroadcastStateTransformationTranslator.java (97%) rename flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/{python => }/PythonKeyedBroadcastStateTransformationTranslator.java (87%) diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 4666c5df79023..7bdea8ecd2809 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -48,8 +48,8 @@ import org.apache.flink.streaming.api.utils.ByteArrayWrapper; import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; -import org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator; -import org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator; +import org.apache.flink.streaming.runtime.translators.PythonBroadcastStateTransformationTranslator; +import org.apache.flink.streaming.runtime.translators.PythonKeyedBroadcastStateTransformationTranslator; import org.apache.flink.util.OutputTag; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java index a5335780756c9..232f97a5e9577 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonBatchKeyedCoBroadcastProcessOperator.java @@ -22,8 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.BoundedMultiInput; -import org.apache.flink.streaming.api.operators.InputSelectable; -import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; @@ -38,7 +36,7 @@ @Internal public class EmbeddedPythonBatchKeyedCoBroadcastProcessOperator extends EmbeddedPythonKeyedCoProcessOperator - implements BoundedMultiInput, InputSelectable { + implements BoundedMultiInput { private static final long serialVersionUID = 1L; @@ -60,15 +58,6 @@ public void endInput(int inputId) throws Exception { } } - @Override - public InputSelection nextSelection() { - if (!isBroadcastSideDone) { - return InputSelection.SECOND; - } else { - return InputSelection.FIRST; - } - } - @Override public void processElement1(StreamRecord element) throws Exception { Preconditions.checkState( diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java index 9184265daec88..c75d16625295c 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/ExternalPythonBatchKeyedCoBroadcastProcessOperator.java @@ -22,8 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.BoundedMultiInput; -import org.apache.flink.streaming.api.operators.InputSelectable; -import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -38,8 +36,7 @@ */ @Internal public class ExternalPythonBatchKeyedCoBroadcastProcessOperator - extends ExternalPythonKeyedCoProcessOperator - implements BoundedMultiInput, InputSelectable { + extends ExternalPythonKeyedCoProcessOperator implements BoundedMultiInput { private static final long serialVersionUID = 1L; @@ -61,15 +58,6 @@ public void endInput(int inputId) throws Exception { } } - @Override - public InputSelection nextSelection() { - if (!isBroadcastSideDone) { - return InputSelection.SECOND; - } else { - return InputSelection.FIRST; - } - } - @Override public void processElement1(StreamRecord element) throws Exception { Preconditions.checkState( diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java index 6b2544e21e034..35be445ef91d1 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java @@ -25,13 +25,13 @@ import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation; +import org.apache.flink.streaming.runtime.translators.PythonBroadcastStateTransformationTranslator; import java.util.List; /** * A {@link Transformation} representing a Python Co-Broadcast-Process operation, which will be - * translated into different operations by {@link - * org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator}. + * translated into different operations by {@link PythonBroadcastStateTransformationTranslator}. */ @Internal public class PythonBroadcastStateTransformation diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java index 72341622243f0..ecbd57778c7cb 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation; +import org.apache.flink.streaming.runtime.translators.PythonKeyedBroadcastStateTransformationTranslator; import org.apache.flink.types.Row; import java.util.List; @@ -33,7 +34,7 @@ /** * A {@link Transformation} representing a Python Keyed-Co-Broadcast-Process operation, which will * be translated into different operations by {@link - * org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}. + * PythonKeyedBroadcastStateTransformationTranslator}. */ @Internal public class PythonKeyedBroadcastStateTransformation diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonBroadcastStateTransformationTranslator.java similarity index 97% rename from flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java rename to flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonBroadcastStateTransformationTranslator.java index 6bd777c0afa8f..ebc19e408de9c 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonBroadcastStateTransformationTranslator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.translators.python; +package org.apache.flink.streaming.runtime.translators; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; @@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.operators.python.process.ExternalPythonCoProcessOperator; import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonBroadcastStateTransformation; -import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator; import org.apache.flink.util.Preconditions; import java.util.Collection; diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonKeyedBroadcastStateTransformationTranslator.java similarity index 87% rename from flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java rename to flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonKeyedBroadcastStateTransformationTranslator.java index 9fac56d246fbe..844f1ffc06f3b 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonKeyedBroadcastStateTransformationTranslator.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.translators.python; +package org.apache.flink.streaming.runtime.translators; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; import org.apache.flink.python.PythonOptions; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonBatchKeyedCoBroadcastProcessOperator; @@ -28,7 +29,6 @@ import org.apache.flink.streaming.api.operators.python.process.ExternalPythonKeyedCoProcessOperator; import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation; -import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -76,15 +76,24 @@ protected Collection translateForBatchInternal( DelegateOperatorTransformation.configureOperator(transformation, operator); - return translateInternal( - transformation, - transformation.getRegularInput(), - transformation.getBroadcastInput(), - SimpleOperatorFactory.of(operator), - transformation.getStateKeyType(), - transformation.getKeySelector(), - null, - context); + Collection result = + translateInternal( + transformation, + transformation.getRegularInput(), + transformation.getBroadcastInput(), + SimpleOperatorFactory.of(operator), + transformation.getStateKeyType(), + transformation.getKeySelector(), + null, + context); + + BatchExecutionUtils.applyBatchExecutionSettings( + transformation.getId(), + context, + StreamConfig.InputRequirement.SORTED, + StreamConfig.InputRequirement.PASS_THROUGH); + + return result; } @Override From 0fa77b74452557a3f86dee75eb6e893f0a4f82fc Mon Sep 17 00:00:00 2001 From: Juntao Hu Date: Mon, 6 Mar 2023 21:02:19 +0800 Subject: [PATCH 2/2] revert package relocation --- .../java/org/apache/flink/python/util/PythonConfigUtil.java | 4 ++-- .../python/PythonBroadcastStateTransformation.java | 4 ++-- .../python/PythonKeyedBroadcastStateTransformation.java | 3 +-- .../PythonBroadcastStateTransformationTranslator.java | 3 ++- .../PythonKeyedBroadcastStateTransformationTranslator.java | 4 +++- .../streaming/runtime/translators/BatchExecutionUtils.java | 4 ++-- 6 files changed, 12 insertions(+), 10 deletions(-) rename flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/{ => python}/PythonBroadcastStateTransformationTranslator.java (97%) rename flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/{ => python}/PythonKeyedBroadcastStateTransformationTranslator.java (96%) diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 7bdea8ecd2809..4666c5df79023 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -48,8 +48,8 @@ import org.apache.flink.streaming.api.utils.ByteArrayWrapper; import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; -import org.apache.flink.streaming.runtime.translators.PythonBroadcastStateTransformationTranslator; -import org.apache.flink.streaming.runtime.translators.PythonKeyedBroadcastStateTransformationTranslator; +import org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator; +import org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator; import org.apache.flink.util.OutputTag; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java index 35be445ef91d1..6b2544e21e034 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java @@ -25,13 +25,13 @@ import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation; -import org.apache.flink.streaming.runtime.translators.PythonBroadcastStateTransformationTranslator; import java.util.List; /** * A {@link Transformation} representing a Python Co-Broadcast-Process operation, which will be - * translated into different operations by {@link PythonBroadcastStateTransformationTranslator}. + * translated into different operations by {@link + * org.apache.flink.streaming.runtime.translators.python.PythonBroadcastStateTransformationTranslator}. */ @Internal public class PythonBroadcastStateTransformation diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java index ecbd57778c7cb..72341622243f0 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java @@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation; -import org.apache.flink.streaming.runtime.translators.PythonKeyedBroadcastStateTransformationTranslator; import org.apache.flink.types.Row; import java.util.List; @@ -34,7 +33,7 @@ /** * A {@link Transformation} representing a Python Keyed-Co-Broadcast-Process operation, which will * be translated into different operations by {@link - * PythonKeyedBroadcastStateTransformationTranslator}. + * org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}. */ @Internal public class PythonKeyedBroadcastStateTransformation diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java similarity index 97% rename from flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonBroadcastStateTransformationTranslator.java rename to flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java index ebc19e408de9c..6bd777c0afa8f 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonBroadcastStateTransformationTranslator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.translators; +package org.apache.flink.streaming.runtime.translators.python; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.python.process.ExternalPythonCoProcessOperator; import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonBroadcastStateTransformation; +import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator; import org.apache.flink.util.Preconditions; import java.util.Collection; diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonKeyedBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java similarity index 96% rename from flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonKeyedBroadcastStateTransformationTranslator.java rename to flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java index 844f1ffc06f3b..a17373add23bb 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/PythonKeyedBroadcastStateTransformationTranslator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.runtime.translators; +package org.apache.flink.streaming.runtime.translators.python; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; @@ -29,6 +29,8 @@ import org.apache.flink.streaming.api.operators.python.process.ExternalPythonKeyedCoProcessOperator; import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation; +import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator; +import org.apache.flink.streaming.runtime.translators.BatchExecutionUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java index 0a922ae3088e6..0eaa0b65310d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java @@ -39,10 +39,10 @@ import static org.apache.flink.util.Preconditions.checkState; /** A utility class for applying sorting inputs. */ -class BatchExecutionUtils { +public class BatchExecutionUtils { private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionUtils.class); - static void applyBatchExecutionSettings( + public static void applyBatchExecutionSettings( int transformationId, TransformationTranslator.Context context, StreamConfig.InputRequirement... inputRequirements) {