diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py index 3b7bb626b1aac..8d4e0bea1b8f6 100644 --- a/flink-python/pyflink/datastream/tests/test_data_stream.py +++ b/flink-python/pyflink/datastream/tests/test_data_stream.py @@ -589,6 +589,40 @@ def process_element2(self, value, ctx: 'CoProcessFunction.Context'): side_expected = ['0', '0', '1', '1', '2', '3'] self.assert_equals_sorted(side_expected, side_sink.get_results()) + def test_co_broadcast_side_output(self): + tag = OutputTag("side", Types.INT()) + + class MyBroadcastProcessFunction(BroadcastProcessFunction): + + def process_element(self, value, ctx): + yield value[0] + yield tag, value[1] + + def process_broadcast_element(self, value, ctx): + yield value[1] + yield tag, value[0] + + self.env.set_parallelism(2) + ds = self.env.from_collection([('a', 0), ('b', 1), ('c', 2)], + type_info=Types.ROW([Types.STRING(), Types.INT()])) + ds_broadcast = self.env.from_collection([(3, 'd'), (4, 'f')], + type_info=Types.ROW([Types.INT(), Types.STRING()])) + map_state_desc = MapStateDescriptor( + "dummy", key_type_info=Types.INT(), value_type_info=Types.STRING() + ) + ds = ds.connect(ds_broadcast.broadcast(map_state_desc)).process( + MyBroadcastProcessFunction(), output_type=Types.STRING() + ) + side_sink = DataStreamTestSinkFunction() + ds.get_side_output(tag).add_sink(side_sink) + ds.add_sink(self.test_sink) + + self.env.execute("test_co_broadcast_process_side_output") + main_expected = ['a', 'b', 'c', 'd', 'd', 'f', 'f'] + self.assert_equals_sorted(main_expected, self.test_sink.get_results()) + side_expected = ['0', '1', '2', '3', '3', '4', '4'] + self.assert_equals_sorted(side_expected, side_sink.get_results()) + def test_keyed_process_side_output(self): tag = OutputTag("side", Types.INT()) @@ -665,6 +699,49 @@ def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'): side_expected = ['1', '1', '2', '2', '3', '3', '4', '4'] self.assert_equals_sorted(side_expected, side_sink.get_results()) + def test_keyed_co_broadcast_side_output(self): + tag = OutputTag("side", Types.INT()) + + class MyKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction): + + def __init__(self): + self.reducing_state = None # type: ReducingState + + def open(self, context: RuntimeContext): + self.reducing_state = context.get_reducing_state( + ReducingStateDescriptor("reduce", lambda i, j: i+j, Types.INT()) + ) + + def process_element(self, value, ctx): + self.reducing_state.add(value[1]) + yield value[0] + yield tag, self.reducing_state.get() + + def process_broadcast_element(self, value, ctx): + yield value[1] + yield tag, value[0] + + self.env.set_parallelism(2) + ds = self.env.from_collection([('a', 0), ('b', 1), ('a', 2), ('b', 3)], + type_info=Types.ROW([Types.STRING(), Types.INT()])) + ds_broadcast = self.env.from_collection([(5, 'c'), (6, 'd')], + type_info=Types.ROW([Types.INT(), Types.STRING()])) + map_state_desc = MapStateDescriptor( + "dummy", key_type_info=Types.INT(), value_type_info=Types.STRING() + ) + ds = ds.key_by(lambda e: e[0]).connect(ds_broadcast.broadcast(map_state_desc)).process( + MyKeyedBroadcastProcessFunction(), output_type=Types.STRING() + ) + side_sink = DataStreamTestSinkFunction() + ds.get_side_output(tag).add_sink(side_sink) + ds.add_sink(self.test_sink) + + self.env.execute("test_keyed_co_broadcast_process_side_output") + main_expected = ['a', 'a', 'b', 'b', 'c', 'c', 'd', 'd'] + self.assert_equals_sorted(main_expected, self.test_sink.get_results()) + side_expected = ['0', '1', '2', '4', '5', '5', '6', '6'] + self.assert_equals_sorted(side_expected, side_sink.get_results()) + def test_side_output_stream_execute_and_collect(self): tag = OutputTag("side", Types.INT()) diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py index 7bac48cdf109c..5160bc9de26e7 100644 --- a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py +++ b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py @@ -100,6 +100,8 @@ def extract_process_function( side_output_context = SideOutputContext(j_side_output_context) def process_func(values): + if values is None: + return for value in values: if isinstance(value, tuple) and isinstance(value[0], OutputTag): output_tag = value[0] # type: OutputTag @@ -108,6 +110,8 @@ def process_func(values): yield value else: def process_func(values): + if values is None: + return yield from values def open_func(): @@ -174,14 +178,10 @@ def process_element_func2(value): process_broadcast_element = user_defined_func.process_broadcast_element def process_element_func1(value): - elements = process_element(value, read_only_broadcast_ctx) - if elements: - yield from elements + yield from process_func(process_element(value, read_only_broadcast_ctx)) def process_element_func2(value): - elements = process_broadcast_element(value, broadcast_ctx) - if elements: - yield from elements + yield from process_func(process_broadcast_element(value, broadcast_ctx)) return TwoInputOperation( open_func, close_func, process_element_func1, process_element_func2) @@ -221,19 +221,20 @@ def on_timer_func(timestamp): timer_context = InternalKeyedBroadcastProcessFunctionOnTimerContext( j_timer_context, user_defined_function_proto.key_type_info, j_operator_state_backend) + keyed_state_backend = KeyedStateBackend( + read_only_broadcast_ctx, + j_keyed_state_backend) + runtime_context.set_keyed_state_backend(keyed_state_backend) + process_element = user_defined_func.process_element process_broadcast_element = user_defined_func.process_broadcast_element on_timer = user_defined_func.on_timer def process_element_func1(value): - elements = process_element(value[1], read_only_broadcast_ctx) - if elements: - yield from elements + yield from process_func(process_element(value[1], read_only_broadcast_ctx)) def process_element_func2(value): - elements = process_broadcast_element(value, broadcast_ctx) - if elements: - yield from elements + yield from process_func(process_broadcast_element(value, broadcast_ctx)) def on_timer_func(timestamp): yield from on_timer(timestamp, timer_context) diff --git a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java index 31001dcb8cff8..547788144aab7 100644 --- a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java +++ b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java @@ -55,6 +55,8 @@ import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; +import org.apache.flink.streaming.api.transformations.python.PythonBroadcastStateTransformation; +import org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; @@ -408,6 +410,11 @@ private static boolean areOperatorsChainable( return false; } + if (upTransform instanceof PythonBroadcastStateTransformation + || upTransform instanceof PythonKeyedBroadcastStateTransformation) { + return false; + } + DataStreamPythonFunctionOperator upOperator = (DataStreamPythonFunctionOperator) ((SimpleOperatorFactory) getOperatorFactory(upTransform)).getOperator(); diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 8f942818d6416..4666c5df79023 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.SideOutputTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonBroadcastStateTransformation; import org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation; import org.apache.flink.streaming.api.utils.ByteArrayWrapper; @@ -152,6 +153,8 @@ public static StreamOperatorFactory getOperatorFactory(Transformation tran return ((TwoInputTransformation) transform).getOperatorFactory(); } else if (transform instanceof AbstractMultipleInputTransformation) { return ((AbstractMultipleInputTransformation) transform).getOperatorFactory(); + } else if (transform instanceof DelegateOperatorTransformation) { + return ((DelegateOperatorTransformation) transform).getOperatorFactory(); } else { return null; } @@ -214,6 +217,9 @@ private static AbstractPythonFunctionOperator getPythonOperator( } else if (transformation instanceof AbstractMultipleInputTransformation) { operatorFactory = ((AbstractMultipleInputTransformation) transformation).getOperatorFactory(); + } else if (transformation instanceof DelegateOperatorTransformation) { + operatorFactory = + ((DelegateOperatorTransformation) transformation).getOperatorFactory(); } if (operatorFactory instanceof SimpleOperatorFactory @@ -260,6 +266,9 @@ public static boolean isPythonDataStreamOperator(Transformation transform) { } else if (transform instanceof TwoInputTransformation) { return isPythonDataStreamOperator( ((TwoInputTransformation) transform).getOperatorFactory()); + } else if (transform instanceof PythonBroadcastStateTransformation + || transform instanceof PythonKeyedBroadcastStateTransformation) { + return true; } else { return false; } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/DelegateOperatorTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/DelegateOperatorTransformation.java new file mode 100644 index 0000000000000..40623cb7e1187 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/DelegateOperatorTransformation.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations.python; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.python.env.PythonEnvironmentManager; +import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator; +import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator; +import org.apache.flink.util.OutputTag; + +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * For those {@link org.apache.flink.api.dag.Transformation} that don't have an operator entity, + * {@link DelegateOperatorTransformation} provides a {@link SimpleOperatorFactory} containing a + * {@link DelegateOperator} , which can hold special configurations during transformation + * preprocessing for Python jobs, and later be queried at translation stage. Currently, those + * configurations include {@link OutputTag}s, {@code numPartitions} and general {@link + * Configuration}. + */ +public interface DelegateOperatorTransformation { + + SimpleOperatorFactory getOperatorFactory(); + + static void configureOperator( + DelegateOperatorTransformation transformation, + AbstractPythonFunctionOperator operator) { + DelegateOperator delegateOperator = + (DelegateOperator) transformation.getOperatorFactory().getOperator(); + + operator.getConfiguration().addAll(delegateOperator.getConfiguration()); + + if (operator instanceof DataStreamPythonFunctionOperator) { + DataStreamPythonFunctionOperator dataStreamOperator = + (DataStreamPythonFunctionOperator) operator; + dataStreamOperator.addSideOutputTags(delegateOperator.getSideOutputTags()); + if (delegateOperator.getNumPartitions() != null) { + dataStreamOperator.setNumPartitions(delegateOperator.getNumPartitions()); + } + } + } + + /** + * {@link DelegateOperator} holds configurations, e.g. {@link OutputTag}s, which will be applied + * to the actual python operator at translation stage. + */ + class DelegateOperator extends AbstractPythonFunctionOperator + implements DataStreamPythonFunctionOperator { + + private final Map> sideOutputTags = new HashMap<>(); + private @Nullable Integer numPartitions = null; + + public DelegateOperator() { + super(new Configuration()); + } + + @Override + public void addSideOutputTags(Collection> outputTags) { + for (OutputTag outputTag : outputTags) { + sideOutputTags.put(outputTag.getId(), outputTag); + } + } + + @Override + public Collection> getSideOutputTags() { + return sideOutputTags.values(); + } + + @Override + public void setNumPartitions(int numPartitions) { + this.numPartitions = numPartitions; + } + + @Nullable + public Integer getNumPartitions() { + return numPartitions; + } + + @Override + public TypeInformation getProducedType() { + throw new RuntimeException("This should not be invoked on a DelegateOperator!"); + } + + @Override + public DataStreamPythonFunctionInfo getPythonFunctionInfo() { + throw new RuntimeException("This should not be invoked on a DelegateOperator!"); + } + + @Override + public DataStreamPythonFunctionOperator copy( + DataStreamPythonFunctionInfo pythonFunctionInfo, + TypeInformation outputTypeInfo) { + throw new RuntimeException("This should not be invoked on a DelegateOperator!"); + } + + @Override + protected void invokeFinishBundle() throws Exception { + throw new RuntimeException("This should not be invoked on a DelegateOperator!"); + } + + @Override + protected PythonEnvironmentManager createPythonEnvironmentManager() { + throw new RuntimeException("This should not be invoked on a DelegateOperator!"); + } + } +} diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java index b6c2be777a18b..6b2544e21e034 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonBroadcastStateTransformation.java @@ -23,6 +23,7 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation; import java.util.List; @@ -34,10 +35,12 @@ */ @Internal public class PythonBroadcastStateTransformation - extends AbstractBroadcastStateTransformation { + extends AbstractBroadcastStateTransformation + implements DelegateOperatorTransformation { private final Configuration configuration; private final DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo; + private final SimpleOperatorFactory delegateOperatorFactory; public PythonBroadcastStateTransformation( String name, @@ -57,6 +60,7 @@ public PythonBroadcastStateTransformation( parallelism); this.configuration = configuration; this.dataStreamPythonFunctionInfo = dataStreamPythonFunctionInfo; + this.delegateOperatorFactory = SimpleOperatorFactory.of(new DelegateOperator<>()); updateManagedMemoryStateBackendUseCase(false); } @@ -67,4 +71,8 @@ public Configuration getConfiguration() { public DataStreamPythonFunctionInfo getDataStreamPythonFunctionInfo() { return dataStreamPythonFunctionInfo; } + + public SimpleOperatorFactory getOperatorFactory() { + return delegateOperatorFactory; + } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java index 03919f609eb29..72341622243f0 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/transformations/python/PythonKeyedBroadcastStateTransformation.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.AbstractBroadcastStateTransformation; import org.apache.flink.types.Row; @@ -36,12 +37,14 @@ */ @Internal public class PythonKeyedBroadcastStateTransformation - extends AbstractBroadcastStateTransformation { + extends AbstractBroadcastStateTransformation + implements DelegateOperatorTransformation { private final Configuration configuration; private final DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo; private final TypeInformation stateKeyType; private final KeySelector keySelector; + private final SimpleOperatorFactory delegateOperatorFactory; public PythonKeyedBroadcastStateTransformation( String name, @@ -65,6 +68,7 @@ public PythonKeyedBroadcastStateTransformation( this.dataStreamPythonFunctionInfo = dataStreamPythonFunctionInfo; this.stateKeyType = keyType; this.keySelector = keySelector; + this.delegateOperatorFactory = SimpleOperatorFactory.of(new DelegateOperator<>()); updateManagedMemoryStateBackendUseCase(true); } @@ -83,4 +87,9 @@ public TypeInformation getStateKeyType() { public KeySelector getKeySelector() { return keySelector; } + + @Override + public SimpleOperatorFactory getOperatorFactory() { + return delegateOperatorFactory; + } } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java index 32d9a2597548f..6bd777c0afa8f 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonBroadcastStateTransformationTranslator.java @@ -21,11 +21,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.python.PythonOptions; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonBatchCoBroadcastProcessOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonCoProcessOperator; import org.apache.flink.streaming.api.operators.python.process.ExternalPythonBatchCoBroadcastProcessOperator; import org.apache.flink.streaming.api.operators.python.process.ExternalPythonCoProcessOperator; +import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonBroadcastStateTransformation; import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator; import org.apache.flink.util.Preconditions; @@ -52,7 +53,7 @@ protected Collection translateForBatchInternal( Configuration config = transformation.getConfiguration(); - StreamOperator operator; + AbstractPythonFunctionOperator operator; if (config.get(PythonOptions.PYTHON_EXECUTION_MODE).equals("thread")) { operator = @@ -72,6 +73,8 @@ protected Collection translateForBatchInternal( transformation.getOutputType()); } + DelegateOperatorTransformation.configureOperator(transformation, operator); + return translateInternal( transformation, transformation.getRegularInput(), @@ -91,7 +94,7 @@ protected Collection translateForStreamingInternal( Configuration config = transformation.getConfiguration(); - StreamOperator operator; + AbstractPythonFunctionOperator operator; if (config.get(PythonOptions.PYTHON_EXECUTION_MODE).equals("thread")) { operator = @@ -112,6 +115,8 @@ protected Collection translateForStreamingInternal( transformation.getOutputType()); } + DelegateOperatorTransformation.configureOperator(transformation, operator); + return translateInternal( transformation, transformation.getRegularInput(), diff --git a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java index cdbf89c142088..9fac56d246fbe 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/runtime/translators/python/PythonKeyedBroadcastStateTransformationTranslator.java @@ -21,11 +21,12 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.python.PythonOptions; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonBatchKeyedCoBroadcastProcessOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonKeyedCoProcessOperator; import org.apache.flink.streaming.api.operators.python.process.ExternalPythonBatchKeyedCoBroadcastProcessOperator; import org.apache.flink.streaming.api.operators.python.process.ExternalPythonKeyedCoProcessOperator; +import org.apache.flink.streaming.api.transformations.python.DelegateOperatorTransformation; import org.apache.flink.streaming.api.transformations.python.PythonKeyedBroadcastStateTransformation; import org.apache.flink.streaming.runtime.translators.AbstractTwoInputTransformationTranslator; import org.apache.flink.types.Row; @@ -53,7 +54,7 @@ protected Collection translateForBatchInternal( Configuration config = transformation.getConfiguration(); - StreamOperator operator; + AbstractPythonFunctionOperator operator; if (config.get(PythonOptions.PYTHON_EXECUTION_MODE).equals("thread")) { operator = @@ -73,6 +74,8 @@ protected Collection translateForBatchInternal( transformation.getOutputType()); } + DelegateOperatorTransformation.configureOperator(transformation, operator); + return translateInternal( transformation, transformation.getRegularInput(), @@ -92,7 +95,7 @@ protected Collection translateForStreamingInternal( Configuration config = transformation.getConfiguration(); - StreamOperator operator; + AbstractPythonFunctionOperator operator; if (config.get(PythonOptions.PYTHON_EXECUTION_MODE).equals("thread")) { operator = @@ -113,6 +116,8 @@ protected Collection translateForStreamingInternal( transformation.getOutputType()); } + DelegateOperatorTransformation.configureOperator(transformation, operator); + return translateInternal( transformation, transformation.getRegularInput(),