Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,62 @@ cdef class Receiver(object):
cpdef receive(self, WindowedValue windowed_value)


cdef class DoFnRunner(Receiver):
cdef class DoFnMethodWrapper(object):
cdef public object args
cdef public object defaults
cdef public object method_value

cdef object dofn
cdef object dofn_process
cdef object window_fn

cdef class DoFnSignature(object):
cdef public DoFnMethodWrapper process_method
cdef public DoFnMethodWrapper start_bundle_method
cdef public DoFnMethodWrapper finish_bundle_method
cdef public object do_fn

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do validate functions need to be mentioned here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to make these cdef methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. So, private methods should not be be defined in pxd files ?


cdef class DoFnInvoker(object):
cdef public DoFnSignature signature
cdef OutputProcessor output_processor

cpdef invoke_process(self, WindowedValue windowed_value)
cpdef invoke_start_bundle(self)
cpdef invoke_finish_bundle(self)

# TODO(chamikara) define static method create_invoker() here.


cdef class SimpleInvoker(DoFnInvoker):
cdef object process_method


cdef class PerWindowInvoker(DoFnInvoker):
cdef list side_inputs
cdef DoFnContext context
cdef list args_for_process
cdef dict kwargs_for_process
cdef list placeholders
cdef bint has_windowed_inputs
cdef object process_method


cdef class DoFnRunner(Receiver):
cdef DoFnContext context
cdef object tagged_receivers
cdef LoggingContext logging_context
cdef object step_name
cdef list args
cdef dict kwargs
cdef ScopedMetricsContainer scoped_metrics_container
cdef list side_inputs
cdef bint has_windowed_inputs
cdef list placeholders
cdef bint use_simple_invoker
cdef DoFnInvoker do_fn_invoker

cpdef process(self, WindowedValue windowed_value)

cdef Receiver main_receivers

cpdef process(self, WindowedValue element)
cdef _dofn_invoker(self, WindowedValue element)
cdef _dofn_simple_invoker(self, WindowedValue element)
cdef _dofn_per_window_invoker(self, WindowedValue element)
cdef class OutputProcessor(object):
cdef object window_fn
cdef Receiver main_receivers
cdef object tagged_receivers

@cython.locals(windowed_value=WindowedValue)
cpdef _process_outputs(self, WindowedValue element, results)
cpdef process_outputs(self, WindowedValue element, results)


cdef class DoFnContext(object):
Expand Down
Loading