Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-17013][python] Support Python UDTF in old planner under batch mode #11668

Merged
merged 3 commits into from
Apr 21, 2020

Conversation

HuangXingBo
Copy link
Contributor

What is the purpose of the change

This pull request will support Python UDTF in old planner under batch mode

Brief change log

  • Add rules to convert RelNode to DataSetPythonCorrelate Node.
  • Add PythonTableFunctionFlatMap function to invoke Python TableFunctions for the legacy planner

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests PyFlinkBatchUserDefinedTableFunctionTests in test_udtf.py

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 8, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 48fe02a (Wed Apr 08 08:58:21 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 8, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HuangXingBo Thanks a lot for the PR. Some feedback about the code duplication problem. Other parts looks good overall. Besides, it would be great if you can rebase to the master. I will take a closer look then. Thank you

@@ -81,6 +81,42 @@ class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedTableFunctionTests,
pass


class PyFlinkBatchUserDefinedTableFunctionTests(PyFlinkBatchTableTestCase):

def test_table_function(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking to avoid these code duplications. Most parts of the code are the same, we only have to extract the different parts into methods and override the methods in the child classes. For example, we can add a base method named get_output in the UserDefinedTableFunctionTests and override the method in PyFlinkBatchUserDefinedTableFunctionTests.

* The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
* {@link DataSetPythonCorrelate}.
*/
public class DataSetPythonCorrelateRule extends ConverterRule {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some base classes for the two classes, i.e., DataSetPythonCorrelateRule and DataStreamPythonCorrelateRule to avoid the code duplications.

@HuangXingBo
Copy link
Contributor Author

Thanks a lot for @hequn8128 review, I have addressed the comments at the latest commit.

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HuangXingBo Thanks a lot for the update. The code looks much better and in a good shape now.

Comment on lines +79 to +86
def _register_table_sink(self, field_names: list, field_types: list):
table_sink = source_sink_utils.TestAppendSink(field_names, field_types)
self.t_env.register_table_sink("Results", table_sink)

def _get_output(self, t):
t.insert_into("Results")
self.t_env.execute("test")
return source_sink_utils.results()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making this two as the default implementation in the base class and only override them in the PyFlinkBatchUserDefinedTableFunctionTests?

@@ -39,16 +40,14 @@
import scala.Some;

/**
* The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
* {@link DataStreamPythonCorrelate}.
* The abstract physical rule base is responsible for convert {@link FlinkLogicalCorrelate} to physical
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responsible for converting

/**
* The factory is responsible to creating {@link DataStreamPythonCorrelate}.
* The abstract factory is responsible to creating {@link DataSetPythonCorrelate} or {@link DataStreamPythonCorrelate}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is responsible for creating

import scala.Option;

/**
* The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responsible for converting

}

/**
* The factory is responsible to creating {@link DataStreamPythonCorrelate}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responsible for creating

import scala.Option;

/**
* The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responsible for converting

}

/**
* The factory is responsible to creating {@link DataSetPythonCorrelate}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responsible for creating

/**
* The factory is responsible to creating {@link DataStreamPythonCorrelate}.
*/
private static class DataStreamPythonCorrelateFactory extends PythonCorrelateFactoryBase{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a blank here, i.e., PythonCorrelateFactoryBase {


@Override
public void bufferInput(Row input) {
// always copy the input Row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more details that why we always copy the input Row. What do you think?

@HuangXingBo
Copy link
Contributor Author

Thanks a lot for @hequn8128 review, I have addressed the comments at the latest commit.

@hequn8128
Copy link
Contributor

@HuangXingBo Thanks a lot for the update. Merging...

@hequn8128 hequn8128 merged commit bda9b77 into apache:master Apr 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants