Skip to content

Commit

Permalink
Address review
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Aug 11, 2021
1 parent fa5fc95 commit d466870
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 20 deletions.
16 changes: 5 additions & 11 deletions flink-python/pyflink/datastream/stream_execution_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,9 +736,6 @@ def execute(self, job_name: str = None) -> JobExecutionResult:
:return: The result of the job execution, containing elapsed time and accumulators.
"""

gateway = get_gateway()
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
self._j_stream_execution_environment)
j_stream_graph = self._generate_stream_graph(clear_transformations=True, job_name=job_name)
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))

Expand All @@ -753,9 +750,6 @@ def execute_async(self, job_name: str = 'Flink Streaming Job') -> JobClient:
:return: A JobClient that can be used to communicate with the submitted job, completed on
submission succeeded.
"""
gateway = get_gateway()
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
self._j_stream_execution_environment)
j_stream_graph = self._generate_stream_graph(clear_transformations=True, job_name=job_name)
j_job_client = self._j_stream_execution_environment.executeAsync(j_stream_graph)
return JobClient(j_job_client=j_job_client)
Expand All @@ -772,9 +766,6 @@ def get_execution_plan(self) -> str:
:return: The execution plan of the program, as a JSON String.
"""
gateway = get_gateway()
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
self._j_stream_execution_environment)
j_stream_graph = self._generate_stream_graph(False)
return j_stream_graph.getStreamingPlanAsJSON()

Expand Down Expand Up @@ -926,9 +917,12 @@ def _from_collection(self, elements: List[Any],

def _generate_stream_graph(self, clear_transformations: bool = False, job_name: str = None) \
-> JavaObject:
j_stream_graph = get_gateway().jvm \
gateway = get_gateway()
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
self._j_stream_execution_environment)
j_stream_graph = gateway.jvm \
.org.apache.flink.python.util.PythonConfigUtil.generateStreamGraphWithDependencies(
self._j_stream_execution_environment, clear_transformations)
self._j_stream_execution_environment, clear_transformations)

if job_name is not None:
j_stream_graph.setJobName(job_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ cdef class IntermediateInputProcessor(InputProcessor):
cdef object _next_value

cdef class OutputProcessor:
cdef Operation _consumer
cpdef process_outputs(self, WindowedValue windowed_value, results)

cdef class NetworkOutputProcessor(OutputProcessor):
cdef Operation _consumer
cdef StreamCoderImpl _value_coder_impl

cdef class IntermediateOutputProcessor(OutputProcessor):
cdef Operation _consumer
pass

cdef class FunctionOperation(Operation):
cdef OutputProcessor _output_processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
# cython: infer_types = True
# cython: profile=True
# cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True
from apache_beam.runners.worker.bundle_processor import DataOutputOperation
from libc.stdint cimport *
from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream
from apache_beam.utils cimport windowed_value
from apache_beam.utils.windowed_value cimport WindowedValue
from pyflink.fn_execution.coder_impl_fast cimport InputStreamWrapper

from apache_beam.runners.worker.bundle_processor import DataOutputOperation
from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkLengthPrefixCoderBeamWrapper
from pyflink.fn_execution.table.operations import BundleOperation
from pyflink.fn_execution.profiler import Profiler
Expand Down
13 changes: 7 additions & 6 deletions flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from apache_beam.runners.worker.bundle_processor import TimerInfo, DataOutputOperation
from apache_beam.runners.worker.operations import Operation
from apache_beam.utils import windowed_value
from apache_beam.utils.windowed_value import WindowedValue

from pyflink.fn_execution.table.operations import BundleOperation
Expand Down Expand Up @@ -147,20 +148,20 @@ def generate_operation(self):
class StatefulFunctionOperation(FunctionOperation):
def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls,
keyed_state_backend):
self.keyed_state_backend = keyed_state_backend
self._keyed_state_backend = keyed_state_backend
self._reusable_windowed_value = windowed_value.create(None, -1, None, None)
super(StatefulFunctionOperation, self).__init__(
name, spec, counter_factory, sampler, consumers, operation_cls)

def generate_operation(self):
return self.operation_cls(self.spec, self.keyed_state_backend)
return self.operation_cls(self.spec, self._keyed_state_backend)

def add_timer_info(self, timer_family_id: str, timer_info: TimerInfo):
# ignore timer_family_id
self.operation.add_timer_info(timer_info)

def process_timer(self, tag, timer_data):
output_stream = self.consumer.output_stream
self._value_coder_impl.encode_to_stream(
self._output_processor.process_outputs(
self._reusable_windowed_value,
# the field user_key holds the timer data
self.operation.process_timer(timer_data.user_key), output_stream, True)
output_stream.maybe_flush()
self.operation.process_timer(timer_data.user_key))

0 comments on commit d466870

Please sign in to comment.