New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-16337][python][table-planner][table-planner-blink] Add support of vectorized Python UDF in blink planner and old planner #11252
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ec77755 (Fri Feb 28 13:09:25 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
10b2004
to
d90b0eb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dianfu Thanks a lot for the PR. Overall it looks good to me except that I'm wondering if we can reuse the current PythonCalRule and PythonCalRelNode. The reasons are:
- Most code between
PythonCalcRule
andArrowPythonCalcRule
,PythonCalRelNode
andArrowPythonCalRelNode
are same. - The Rule even doesn't need to be changed if we reuse the Rule and RelNode.
- The change in the RelNode would be also small, e.g., adding some
if else
to load the corresponding runtime operator.
Besides, is it possible to support pandas udf for batch mode in old planner?
What do you think?
@@ -32,51 +33,85 @@ object PythonUtil { | |||
* @param node the RexNode to check | |||
* @return true if it contains the Python function call in the specified node. | |||
*/ | |||
def containsPythonCall(node: RexNode): Boolean = node.accept(new FunctionFinder(true, true)) | |||
def containsPythonCall(node: RexNode): Boolean = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about merging the two containsPythonCall
. I see two options:
- Add a default parameter for
pythonFunctionKind
. - Add a GENERAL_PANDAS enum type in
PythonFunctionKind
.
/** | ||
* Rule that converts [[FlinkLogicalCalc]] to [[BatchExecArrowPythonCalc]]. | ||
*/ | ||
class BatchExecArrowPythonCalcRule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about reusing the current PythonCalcRule and PythonCalRelNode? Most code between PythonCalcRule
and ArrowPythonCalcRule
, PythonCalRelNode
and ArrowPythonCalRelNode
are same. The Rules even don't need to be changed.
Besides, we may need to convert this class to Java? In the long term, it's better to avoid new scala classes. Also see discussion here: #11051 (comment)
What do you think?
ret, getPythonWorkerMemory(planner.getTableConfig.getConfiguration)) | ||
} | ||
|
||
private def getPythonWorkerMemory(config: Configuration): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is copied from BatchExecPythonCalc
. How about putting this method into the CommonPythonCalc
?
import org.apache.flink.table.functions.{FunctionContext, ScalarFunction, TableFunction} | ||
import org.apache.flink.types.Row | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary changes.
…Python UDF in blink planner
… UDF in old planner
@hequn8128 Thanks a lot for your great review and suggestions. That makes much sense to me and have updated the PR accordingly. Regarding to the support of pandas udf for batch mode in old planner, I'd like to add it in a separate PR as the operator for this case is still not added. What's your thoughts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dianfu Thanks a lot for the update. LGTM. Will merge this once test passed.
… UDF in old planner This closes apache#11252.
What is the purpose of the change
This pull request adds the relNodes and rules to support vectorized Python UDF in blink planner and old planner.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation