Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

Expand All @@ -38,7 +36,7 @@
@Internal
public class EmbeddedPythonBatchKeyedCoBroadcastProcessOperator<K, IN1, IN2, OUT>
extends EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT>
implements BoundedMultiInput, InputSelectable {
implements BoundedMultiInput {

private static final long serialVersionUID = 1L;

Expand All @@ -60,15 +58,6 @@ public void endInput(int inputId) throws Exception {
}
}

@Override
public InputSelection nextSelection() {
if (!isBroadcastSideDone) {
return InputSelection.SECOND;
} else {
return InputSelection.FIRST;
}
}

@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
Expand All @@ -38,8 +36,7 @@
*/
@Internal
public class ExternalPythonBatchKeyedCoBroadcastProcessOperator<OUT>
extends ExternalPythonKeyedCoProcessOperator<OUT>
implements BoundedMultiInput, InputSelectable {
extends ExternalPythonKeyedCoProcessOperator<OUT> implements BoundedMultiInput {

private static final long serialVersionUID = 1L;

Expand All @@ -61,15 +58,6 @@ public void endInput(int inputId) throws Exception {
}
}

@Override
public InputSelection nextSelection() {
if (!isBroadcastSideDone) {
return InputSelection.SECOND;
} else {
return InputSelection.FIRST;
}
}

@Override
public void processElement1(StreamRecord<Row> element) throws Exception {
Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator;
import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonBatchKeyedCoBroadcastProcessOperator;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation;
import org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation;
import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.BatchExecutionUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -76,15 +78,24 @@ protected Collection<Integer> translateForBatchInternal(

DelegateOperatorTransformation.configureOperator(transformation, operator);

return translateInternal(
transformation,
transformation.getRegularInput(),
transformation.getBroadcastInput(),
SimpleOperatorFactory.of(operator),
transformation.getStateKeyType(),
transformation.getKeySelector(),
null,
context);
Collection<Integer> result =
translateInternal(
transformation,
transformation.getRegularInput(),
transformation.getBroadcastInput(),
SimpleOperatorFactory.of(operator),
transformation.getStateKeyType(),
transformation.getKeySelector(),
null,
context);

BatchExecutionUtils.applyBatchExecutionSettings(
transformation.getId(),
context,
StreamConfig.InputRequirement.SORTED,
StreamConfig.InputRequirement.PASS_THROUGH);

return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import static org.apache.flink.util.Preconditions.checkState;

/** A utility class for applying sorting inputs. */
class BatchExecutionUtils {
public class BatchExecutionUtils {
private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionUtils.class);

static void applyBatchExecutionSettings(
public static void applyBatchExecutionSettings(
int transformationId,
TransformationTranslator.Context context,
StreamConfig.InputRequirement... inputRequirements) {
Expand Down