Skip to content

Conversation

@Vancior
Copy link
Contributor

@Vancior Vancior commented Jul 4, 2022

What is the purpose of the change

This PR implements keyed broadcast processing ( KeyedStream.connect(BroadcastStream).process(KeyedBroadcastProcessFunction) ) in PyFlink.

Brief change log

  • add KeyedBroadcastProcessFunction interface in Python
  • introduce KeyedBroadcastStateTransformation and its translator in compile keyed broadcast processing into PythonKeyedCoProcessor or PythonBatchKeyedCoBroadcastProcessOperator.

Verifying this change

This change added tests and can be verified as follows:

  • integration test test_keyed_co_broadcast_process in test_data_stream.py

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (Python Sphinx doc)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 4, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

__all__ = ['CloseableIterator', 'DataStream', 'KeyedStream', 'ConnectedStreams', 'WindowedStream',
'DataStreamSink', 'CloseableIterator', 'BroadcastStream', 'BroadcastConnectedStream']

from pyflink.util.java_utils import to_jarray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the import before __all__

:param ds: The DataStream with which this stream will be connected.
:return: The ConnectedStreams.
.. versionchanged:: 1.16.0
Support connect BroadcastStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Support connect BroadcastStream
Support connect BroadcastStream

Comment on lines +1397 to +1398
:param ctx: A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows querying the
timestamp of the element, querying the current processing/event time and reading the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param ctx: A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows querying the
timestamp of the element, querying the current processing/event time and reading the
:param ctx:
A :class:`BroadcastProcessFunction.ReadOnlyContext` that allows querying the
timestamp of the element, querying the current processing/event time and reading the

* org.apache.flink.streaming.runtime.translators.python.PythonKeyedBroadcastStateTransformationTranslator}.
*/
@Internal
public class PythonKeyedBroadcastStateTransformation<OUT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could PythonKeyedBroadcastStateTransformation extend PythonBroadcastStateTransformation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried before, but there's just some class casting cannot be solved.

* @param <OUT> The output type of the CoBroadcastProcess function
*/
@Internal
public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add serialVersionUID

public class PythonBatchKeyedCoBroadcastProcessOperator<OUT>
extends PythonKeyedCoProcessOperator<OUT> implements BoundedMultiInput, InputSelectable {

private transient volatile boolean isBroadcastSideDone;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private transient volatile boolean isBroadcastSideDone;
private transient volatile boolean isBroadcastSideDone = false;

@Vancior Vancior force-pushed the feat/py_keyed_broadcast branch from f292e4b to 7edad7b Compare July 15, 2022 03:59
JunRuiLee pushed a commit to JunRuiLee/flink that referenced this pull request Jul 20, 2022
liujiawinds pushed a commit to liujiawinds/flink that referenced this pull request Jul 22, 2022
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants