Skip to content

Conversation

@Vancior
Copy link
Contributor

@Vancior Vancior commented Jun 6, 2022

What is the purpose of the change

This PR implements non-keyed co-broadcast processing interfaces, including BroadcastStream, BroadcastConnectedStream and BroadcastProcessFunction.

Brief change log

  • add BroadcastStream, BroadcastConnectedStream, BroadcastProcessFunction and related internal classes
  • add an distinct CO_BROADCAST_PROCESS function type for broadcast processing
  • introduce PythonBroadcastStateTransformation and corresponding translator to translate python broadcast process into different operators for batch or streaming

Verifying this change

This change added tests and can be verified as follows:

  • test_co_broadcast_process in test_data_stream.py

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (Python generated doc)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 6, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Vancior Thanks a lot for the PR. I have left some comments.

JPythonBroadcastStateTransformation = (
gateway.jvm.org.apache.flink.streaming.api.transformations.python
).PythonBroadcastStateTransformation
j_state_names = ListConverter().convert(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the typeinfo set in MapDescriptor have any effect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no, but the base class of PythonBroadcastStateTransformation, AbstractBroadcastStateTransformation needs this for constructor, so this's just for expected behavior of the base class, althrough in PyFlink we are not restricting which broadcast states can be accessed during runtime.

serialized_fn.map_state_write_cache_size,
)
else:
operator_state_backend = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When serialized_fn is not a instance of flink_fn_execution_pb2.UserDefinedDataStreamFunction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only DataStream API can use operator state, so I guess that's okay

raise Exception('Cannot override partitioning for KeyedStream.')

def broadcast(self) -> 'DataStream':
def broadcast(self, *args) -> Union['DataStream', 'BroadcastStream']:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not going to support broadcast on keyedstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's will be another PR.

def broadcast(self, *args) -> 'BroadcastStream':
pass

def broadcast(self, *args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the typehint too?

pass

@overload
def broadcast(self, *args) -> 'BroadcastStream':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the typeint MapStateDescriptor?

raise Exception('Cannot override partitioning for KeyedStream.')

def broadcast(self) -> 'DataStream':
def broadcast(self, *args):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

def connect(self, ds: 'BroadcastStream') -> 'BroadcastConnectedStream':
pass

def connect(self, ds):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the typeint Union['DataStream', 'BroadcastStream']?

@Vancior Vancior force-pushed the feat/py_broadcast_process branch from 25d5966 to 4404d2a Compare June 27, 2022 01:43
Copy link
Contributor

@HuangXingBo HuangXingBo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Vancior I have only one comment. And I think you can rebase master and push again.

@Vancior Vancior force-pushed the feat/py_broadcast_process branch from 4404d2a to f98551b Compare June 29, 2022 02:17
@Vancior Vancior changed the title [FLINK-27584][python] Support non-keyed co-broadcast processing [FLINK-27586][python] Support non-keyed co-broadcast processing Jun 30, 2022
zhuyufeng0809 pushed a commit to zhuyufeng0809/flink that referenced this pull request Jul 1, 2022
ericccarlson pushed a commit to ericccarlson/flink that referenced this pull request Jul 11, 2022
liujiawinds pushed a commit to liujiawinds/flink that referenced this pull request Jul 22, 2022
MartijnVisser pushed a commit to sunshineJK/flink that referenced this pull request Jul 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants