Skip to content

Commit

Permalink
[FLINK-23616][python] Support to chain the Python DataStream operator…
Browse files Browse the repository at this point in the history
…s as much as possible

This closes apache#16777.
  • Loading branch information
dianfu authored and Kexin Hui committed Aug 25, 2021
1 parent 438f7f0 commit 0989e44
Show file tree
Hide file tree
Showing 33 changed files with 2,045 additions and 671 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/python_configuration.html
Expand Up @@ -80,6 +80,12 @@
<td>Boolean</td>
<td>When it is false, metric for Python will be disabled. You can disable the metric to achieve better performance at some circumstance.</td>
</tr>
<tr>
<td><h5>python.operator-chaining.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Python operator chaining allows non-shuffle operations to be co-located in the same thread fully avoiding serialization and de-serialization.</td>
</tr>
<tr>
<td><h5>python.profile.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/datastream/data_stream.py
Expand Up @@ -68,7 +68,7 @@ def get_name(self) -> str:
def name(self, name: str) -> 'DataStream':
"""
Sets the name of the current data stream. This name is used by the visualization and logging
during runting.
during runtime.
:param name: Name of the stream.
:return: The named operator.
Expand Down
10 changes: 5 additions & 5 deletions flink-python/pyflink/datastream/stream_execution_environment.py
Expand Up @@ -737,7 +737,6 @@ def execute(self, job_name: str = None) -> JobExecutionResult:
"""

j_stream_graph = self._generate_stream_graph(clear_transformations=True, job_name=job_name)

return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))

def execute_async(self, job_name: str = 'Flink Streaming Job') -> JobClient:
Expand All @@ -752,7 +751,6 @@ def execute_async(self, job_name: str = 'Flink Streaming Job') -> JobClient:
submission succeeded.
"""
j_stream_graph = self._generate_stream_graph(clear_transformations=True, job_name=job_name)

j_job_client = self._j_stream_execution_environment.executeAsync(j_stream_graph)
return JobClient(j_job_client=j_job_client)

Expand All @@ -769,7 +767,6 @@ def get_execution_plan(self) -> str:
:return: The execution plan of the program, as a JSON String.
"""
j_stream_graph = self._generate_stream_graph(False)

return j_stream_graph.getStreamingPlanAsJSON()

@staticmethod
Expand Down Expand Up @@ -920,9 +917,12 @@ def _from_collection(self, elements: List[Any],

def _generate_stream_graph(self, clear_transformations: bool = False, job_name: str = None) \
-> JavaObject:
j_stream_graph = get_gateway().jvm \
gateway = get_gateway()
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
self._j_stream_execution_environment)
j_stream_graph = gateway.jvm \
.org.apache.flink.python.util.PythonConfigUtil.generateStreamGraphWithDependencies(
self._j_stream_execution_environment, clear_transformations)
self._j_stream_execution_environment, clear_transformations)

if job_name is not None:
j_stream_graph.setJobName(job_name)
Expand Down
41 changes: 23 additions & 18 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Expand Up @@ -197,16 +197,17 @@ def test_keyed_co_process(self):
ds1.connect(ds2) \
.key_by(lambda x: x[0], lambda x: x[0]) \
.process(MyKeyedCoProcessFunction()) \
.map(lambda x: Row(x[0], x[1] + 1)) \
.add_sink(self.test_sink)
self.env.execute('test_keyed_co_process_function')
results = self.test_sink.get_results(True)
expected = ["<Row('a', 1)>",
"<Row('b', 1)>",
expected = ["<Row('a', 2)>",
"<Row('b', 2)>",
"<Row('c', 1)>",
"<Row('b', 3)>",
"<Row('c', 2)>",
"<Row('d', 1)>",
"<Row('on_timer', 3)>"]
"<Row('c', 3)>",
"<Row('d', 2)>",
"<Row('on_timer', 4)>"]
self.assert_equals_sorted(expected, results)

def test_keyed_co_map(self):
Expand Down Expand Up @@ -246,14 +247,13 @@ def map2(self, value):
ds1.connect(ds2)\
.key_by(MyKeySelector(), MyKeySelector(), key_type=Types.INT())\
.map(AssertKeyCoMapFunction())\
.map(lambda x: (x[0], x[1] + 1)) \
.add_sink(self.test_sink)

self.env.execute()
results = self.test_sink.get_results(True)
expected = ["Row(f0='e', f1=2)", "Row(f0='a', f1=0)", "Row(f0='b', f1=0)",
"Row(f0='c', f1=1)", "Row(f0='d', f1=1)", "Row(f0='e', f1=2)",
"Row(f0='a', f1=0)", "Row(f0='b', f1=0)", "Row(f0='c', f1=1)",
"Row(f0='d', f1=1)"]
expected = ["('e', 3)", "('a', 1)", "('b', 1)", "('c', 2)", "('d', 2)", "('e', 3)",
"('a', 1)", "('b', 1)", "('c', 2)", "('d', 2)"]
self.assert_equals_sorted(expected, results)

def test_keyed_co_flat_map(self):
Expand All @@ -263,10 +263,11 @@ def test_keyed_co_flat_map(self):
type_info=Types.ROW([Types.STRING(), Types.STRING()]))
ds1.connect(ds2).key_by(lambda x: 1, lambda x: 1) \
.flat_map(MyRichCoFlatMapFunction(), output_type=Types.STRING()) \
.filter(lambda x: x != '4') \
.add_sink(self.test_sink)
self.env.execute('test_keyed_co_flat_map')
results = self.test_sink.get_results(False)
expected = ['2', '2', '3', '3', '4', '4', 'a', 'b', 'c']
expected = ['2', '2', '3', '3', 'a', 'b', 'c']
self.assert_equals_sorted(expected, results)

def test_execute_and_collect(self):
Expand Down Expand Up @@ -361,11 +362,12 @@ def _get_state_value(self):
state_value += 1
return state_value

keyed_stream.map(AssertKeyMapFunction()).add_sink(self.test_sink)
keyed_stream.map(AssertKeyMapFunction())\
.map(lambda x: (x[0], x[1] + 1))\
.add_sink(self.test_sink)
self.env.execute('test_keyed_map')
results = self.test_sink.get_results(True)
expected = ["Row(f0='e', f1=2)", "Row(f0='a', f1=0)", "Row(f0='b', f1=1)",
"Row(f0='c', f1=0)", "Row(f0='d', f1=1)"]
expected = ["('e', 3)", "('a', 1)", "('b', 2)", "('c', 1)", "('d', 2)"]
self.assert_equals_sorted(expected, results)

def test_keyed_flat_map(self):
Expand Down Expand Up @@ -403,11 +405,12 @@ def flat_map(self, value):
self.state.update(state_value)
yield value

keyed_stream.flat_map(AssertKeyMapFunction()).add_sink(self.test_sink)
keyed_stream.flat_map(AssertKeyMapFunction())\
.map(lambda x: (x[0], x[1] + 1))\
.add_sink(self.test_sink)
self.env.execute('test_keyed_flat_map')
results = self.test_sink.get_results(True)
expected = ["Row(f0='e', f1=2)", "Row(f0='a', f1=0)", "Row(f0='b', f1=0)",
"Row(f0='c', f1=1)", "Row(f0='d', f1=1)"]
expected = ["('e', 3)", "('a', 1)", "('b', 1)", "('c', 2)", "('d', 2)"]
self.assert_equals_sorted(expected, results)

def test_keyed_filter(self):
Expand Down Expand Up @@ -447,10 +450,12 @@ def filter(self, value):
self.state.update(state_value)
return True

keyed_stream.filter(AssertKeyFilterFunction()).add_sink(self.test_sink)
keyed_stream.filter(AssertKeyFilterFunction())\
.filter(lambda x: x[1] > 0)\
.add_sink(self.test_sink)
self.env.execute('key_by_test')
results = self.test_sink.get_results(False)
expected = ['+I[a, 0]', '+I[c, 1]', '+I[e, 2]']
expected = ['+I[c, 1]', '+I[e, 2]']
self.assert_equals_sorted(expected, results)

def test_multi_key_by(self):
Expand Down
30 changes: 27 additions & 3 deletions flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
Expand Up @@ -19,11 +19,34 @@

from apache_beam.coders.coder_impl cimport StreamCoderImpl
from apache_beam.runners.worker.operations cimport Operation
from apache_beam.utils.windowed_value cimport WindowedValue

from pyflink.fn_execution.coder_impl_fast cimport InputStreamWrapper

cdef class InputProcessor:
cpdef has_next(self)
cpdef next(self)

cdef class NetworkInputProcessor(InputProcessor):
cdef InputStreamWrapper _input_stream_wrapper

cdef class IntermediateInputProcessor(InputProcessor):
cdef object _input_values
cdef object _next_value

cdef class OutputProcessor:
cdef Operation _consumer
cpdef process_outputs(self, WindowedValue windowed_value, results)

cdef class NetworkOutputProcessor(OutputProcessor):
cdef StreamCoderImpl _value_coder_impl

cdef class IntermediateOutputProcessor(OutputProcessor):
pass

cdef class FunctionOperation(Operation):
cdef Operation consumer
cdef OutputProcessor _output_processor
cdef bint _is_python_coder
cdef StreamCoderImpl _value_coder_impl
cdef object process_element
cdef object operation
cdef object operation_cls
Expand All @@ -34,6 +57,7 @@ cdef class StatelessFunctionOperation(FunctionOperation):
pass

cdef class StatefulFunctionOperation(FunctionOperation):
cdef object keyed_state_backend
cdef object _keyed_state_backend
cdef WindowedValue _reusable_windowed_value
cpdef void add_timer_info(self, timer_family_id, timer_info)
cpdef process_timer(self, tag, timer_data)
123 changes: 98 additions & 25 deletions flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
Expand Up @@ -21,13 +21,83 @@
# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
from libc.stdint cimport *
from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
from apache_beam.utils cimport windowed_value
from apache_beam.utils.windowed_value cimport WindowedValue
from pyflink.fn_execution.coder_impl_fast cimport InputStreamWrapper

from apache_beam.runners.worker.bundle_processor import DataOutputOperation
from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkLengthPrefixCoderBeamWrapper
from pyflink.fn_execution.table.operations import BundleOperation
from pyflink.fn_execution.profiler import Profiler


cdef class InputProcessor:

cpdef has_next(self):
pass

cpdef next(self):
pass


cdef class NetworkInputProcessor(InputProcessor):

def __init__(self, InputStreamWrapper input_stream_wrapper):
self._input_stream_wrapper = input_stream_wrapper

cpdef has_next(self):
return self._input_stream_wrapper.has_next()

cpdef next(self):
return self._input_stream_wrapper.next()


cdef class IntermediateInputProcessor(InputProcessor):

def __init__(self, input_values):
self._input_values = input_values
self._next_value = None

cpdef has_next(self):
try:
self._next_value = next(self._input_values)
except StopIteration:
self._next_value = None

return self._next_value is not None

cpdef next(self):
return self._next_value


cdef class OutputProcessor:

cpdef process_outputs(self, WindowedValue windowed_value, results):
pass


cdef class NetworkOutputProcessor(OutputProcessor):

def __init__(self, consumer):
assert isinstance(consumer, DataOutputOperation)
self._consumer = consumer
self._value_coder_impl = consumer.windowed_coder.wrapped_value_coder.get_impl()

cpdef process_outputs(self, WindowedValue windowed_value, results):
output_stream = self._consumer.output_stream
self._value_coder_impl.encode_to_stream(results, output_stream, True)
output_stream.maybe_flush()


cdef class IntermediateOutputProcessor(OutputProcessor):

def __init__(self, consumer):
self._consumer = consumer

cpdef process_outputs(self, WindowedValue windowed_value, results):
self._consumer.process(windowed_value.with_value(results))


cdef class FunctionOperation(Operation):
"""
Base class of function operation that will execute StatelessFunction or StatefulFunction for
Expand All @@ -36,13 +106,18 @@ cdef class FunctionOperation(Operation):

def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls):
super(FunctionOperation, self).__init__(name, spec, counter_factory, sampler)
self.consumer = consumers['output'][0]
self._value_coder_impl = self.consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder
consumer = consumers['output'][0]
if isinstance(consumer, DataOutputOperation):
self._output_processor = NetworkOutputProcessor(consumer)

if isinstance(self._value_coder_impl, FlinkLengthPrefixCoderBeamWrapper):
self._is_python_coder = False
_value_coder_impl = consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder
if isinstance(_value_coder_impl, FlinkLengthPrefixCoderBeamWrapper):
self._is_python_coder = False
else:
self._is_python_coder = True
else:
self._is_python_coder = True
self._output_processor = IntermediateOutputProcessor(consumer)
self._is_python_coder = False

self.operation_cls = operation_cls
self.operation = self.generate_operation()
Expand Down Expand Up @@ -72,27 +147,24 @@ cdef class FunctionOperation(Operation):

cpdef process(self, WindowedValue o):
cdef InputStreamWrapper input_stream_wrapper
cdef BOutputStream output_stream
output_stream = self.consumer.output_stream
cdef InputProcessor input_processor
with self.scoped_process_state:
if self._is_python_coder:
for value in o.value:
self._value_coder_impl.encode_to_stream(
self.process_element(value), output_stream, True)
output_stream.maybe_flush()
self._output_processor.process_outputs(o, self.process_element(value))
else:
input_stream_wrapper = o.value
if isinstance(o.value, InputStreamWrapper):
input_processor = NetworkInputProcessor(o.value)
else:
input_processor = IntermediateInputProcessor(o.value)
if isinstance(self.operation, BundleOperation):
while input_stream_wrapper.has_next():
self.process_element(input_stream_wrapper.next())
result = self.operation.finish_bundle()
self._value_coder_impl.encode_to_stream(
result, output_stream, True)
while input_processor.has_next():
self.process_element(input_processor.next())
self._output_processor.process_outputs(o, self.operation.finish_bundle())
else:
while input_stream_wrapper.has_next():
result = self.process_element(input_stream_wrapper.next())
self._value_coder_impl.encode_to_stream(
result, output_stream, True)
while input_processor.has_next():
result = self.process_element(input_processor.next())
self._output_processor.process_outputs(o, result)

def progress_metrics(self):
metrics = super(FunctionOperation, self).progress_metrics()
Expand Down Expand Up @@ -126,20 +198,21 @@ cdef class StatelessFunctionOperation(FunctionOperation):
cdef class StatefulFunctionOperation(FunctionOperation):
def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
keyed_state_backend):
self.keyed_state_backend = keyed_state_backend
self._keyed_state_backend = keyed_state_backend
self._reusable_windowed_value = windowed_value.create(None, -1, None, None)
super(StatefulFunctionOperation, self).__init__(
name, spec, counter_factory, sampler, consumers, operation_cls)

cdef object generate_operation(self):
return self.operation_cls(self.spec, self.keyed_state_backend)
return self.operation_cls(self.spec, self._keyed_state_backend)

cpdef void add_timer_info(self, timer_family_id, timer_info):
# ignore timer_family_id
self.operation.add_timer_info(timer_info)

cpdef process_timer(self, tag, timer_data):
cdef BOutputStream output_stream
output_stream = self.consumer.output_stream
self._value_coder_impl.encode_to_stream(
self._output_processor.process_outputs(
self._reusable_windowed_value,
# the field user_key holds the timer data
self.operation.process_timer(timer_data.user_key), output_stream, True)
self.operation.process_timer(timer_data.user_key))

0 comments on commit 0989e44

Please sign in to comment.