Skip to content

Conversation

Vancior
Copy link
Contributor

@Vancior Vancior commented Feb 23, 2023

What is the purpose of the change

This PR supports using side-output functionality in broadcast processing case in pyflink.

Brief change log

  • add operator delegation when processing side-outputs in transformations, since broadcast transformation doesn't have a solid operator
  • move delegated side-output tags to solid operator in translation stage, making the python operator aware of side-output
  • adjust embedded-mode code to support side-output in broadcast operator

Verifying this change

This change added tests and can be verified as follows:

  • test_co_broadcast_side_output and test_keyed_co_broadcast_side_output in test_data_stream.py

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 23, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

public <T> DataStreamPythonFunctionOperator<T> copy(
DataStreamPythonFunctionInfo pythonFunctionInfo,
TypeInformation<T> outputTypeInfo) {
throw new RuntimeException("This should not be invoked on a DelegateOperator!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we should implement this. This method is used when performing operator chain optimization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently python broadcast opreator does not support chaining with downstream, so I guess this check could be remove in another PR that supports broadcast chaining.


SimpleOperatorFactory<OUT> getOperatorFactory();

static void configureDelegatedOperator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming it to configure or something else? As in this method, we are actually config operator instead of delegateOperator.

yield value[1]
yield tag, value[0]

self.env.set_parallelism(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary as the parallelism is 2 by default. Refer to PyFlinkStreamingTestCase for more details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of an "explict reminder" that tells we expect the output result should match parallelism=2 with some elements duplicated.

@dianfu dianfu closed this in 8d52415 Mar 3, 2023
dianfu pushed a commit that referenced this pull request Mar 3, 2023
dianfu pushed a commit that referenced this pull request Mar 3, 2023
lindong28 pushed a commit to lindong28/flink that referenced this pull request Mar 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants