Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution #9653

Closed
wants to merge 4 commits into from

Conversation

dianfu
Copy link
Contributor

@dianfu dianfu commented Sep 9, 2019

What is the purpose of the change

This pull request introduces PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution. Internally, it uses Apache Beam's portability framework for Python UDF execution.

Brief change log

  • It bumps maven-shade-plugin to 3.2.1 as version below 3.1.0 could not work well with ASM 6.0(MSHADE-258) which is dependent by Beam
  • It introduces basic classes such as PythonEnv, PythonFunction, PythonFunctionInfo into flink-table-common which hold the information such as the Python execution environment, the serialized Python functions, etc. These classes are located in flink-table-common because they will be used by the added RelNode which will be introduced later
  • Introduces a series of PythonFunctionRunners. They are responsible for using Beam's portability framework for Python UDF execution
  • Handle the license statements as it introduces a few third-part dependences are introduced because of we depends on Beam

Verifying this change

This change added tests and can be verified as follows:

  • The added PythonScalarFunctionRunnerTest, BeamTypeUtilsTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 9, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit d3fa2f9 (Wed Oct 16 08:37:36 UTC 2019)

Warnings:

  • 2 pom.xml files were touched: Check for build and licensing issues.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Sep 9, 2019

CI report:

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dianfu Thanks a lot for the nice PR. I have finished the review of the first commit(see comments below). Will continue to review the next ones straight away.

Best, Hequn

pom.xml Outdated
@@ -1667,7 +1669,7 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<version>3.1.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we bump it to 3.2.1? see FLINK-13453. And would be great if we also update the document about maven-shade-plugin in dependencies.md

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments for the second commit.

/**
* Python execution environments.
*/
public final class PythonEnv implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Internal

// python function is executed in an external non-managed process
EXTERNAL,

UNKNOWN,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the , after UNKNOWN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about removing the UNKNOWN type?

Copy link
Contributor

@hequn8128 hequn8128 Sep 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. How about removing DOCKER and EXTERNAL for now as well? It seems we are going to only support PROCESS first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


private static final long serialVersionUID = 1L;

private final String pythonExec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for these member variables? It would be easier to understand these variables

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for adding the comments. Maybe rename the variable name to pythonExecFilePath? Same for pipExec


/**
* PythonFunctionInfo contains the execution information of a Python function, such as:
* the actual Python function, the input arguments, the input types, the result type, etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make the comment consistent with the implementation of the class? It seems this class only contains the python function and it's input indexes, i.e., does not contain input types, result type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Originally it contains the input types and result type. Forgot to update the comments when removing those fields.

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments for the third commit(Coder part, FunctionRunner part later).

// User-defined function definition. It supports chaining functions, that's, the execution
// result of one user-defined function as the input of another user-defined function.
message UserDefinedFunction {
message Input {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make two spaces as the indent here? I find it is the common behavior of proto files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.

ARRAY = 15;
MAP = 16;
MULTISET = 17;
ROW = 18;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tag numbers 1-15 require one less byte to encode than higher numbers, so should we place ROW to a number less than 16? The ROW type is frequently used.

Copy link
Contributor Author

@dianfu dianfu Sep 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Row is a special type compared to the other types as it could be used to represent the row type of a table. So it seems more reasonable to put it at last of the type list. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New types would be added later, and we can't change the tag number for ROW at that time. In this case, we can't make sure it is at the last.
If we want to separate ROW from the others, we can put ROW at the front, this can also make encode ROW one less byte.(My main concern is the performance, see here about the tag number)
What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch for the performance issue. Make sense to me. +1 to put it at the first of the list.

* A {@link Coder} for BIGINT.
*/
@Internal
public class BigIntCoder extends Coder<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use the VarLongCoder in beam directly?

* A {@link Coder} for {@link Row}.
*/
@Internal
public class RowCoder extends Coder<Row> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override consistentWithEquals to true?

}

for (int i = 0; i < row.getArity(); i++) {
fieldCoders[i].encode(row.getField(i), outStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should take the null field into consideration. Coder in Beam can not encode null values. Same for BaseRowCoder.

}
}

private static class LogicalTypeToBlinkCoderConverter implements LogicalTypeVisitor<Coder> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this class extends from the LogicalTypeToCoderConverter? Most parts of the two classes are same.

* Utilities for converting Flink data types to Beam data types.
*/
@Internal
public final class BeamTypeUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only support BigIntType and RowType in these Converter classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make sure this PR only focus on the runner part. For the types, I want to add them in a separate PR. BigIntType and RowType are added in this PR because they are the minimum types needed to run end to end use cases. Does that make sense to you? If so, I will create a follow up JIRA.

/**
* Tests for {@link BeamTypeUtils}.
*/
public class BeamTypeUtilsTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add tests for BaseRowCoder

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comments for the third commit(FunctionRunner part).

Main comments are:

  • It seems we can use the interface in Beam directly to avoid creating duplicate interfaces. For example, we can replace PythonFunctionExecutionResultReceiver with FnDataReceiver.
  • Methods in PythonFunctionRunner have been implemented in its child class however have not been covered in any tests. Can we add some tests to cover these methods? Maybe we can also remove these methods for now and add them later?

What do you think? @dianfu


/**
* A {@link PythonFunctionRunner} used to execute Python {@link ScalarFunction}s. It takes {@link BaseRow} as the
* input and output row type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes {@link BaseRow} as the input and output type? (Also for the comment in PythonScalarFunctionRunner)

public class PythonScalarFunctionRunnerTest {

@Test
public void testInputOutputDataTypeConstructedProperlyForSingleUDF() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getInputCoder() and getOutputCoder() methods in BaseRowPythonScalarFunctionRunner have not been covered by any tests. We can add a similar test.

* create a Python execution environment for each operator.
*/
@Internal
public class DefaultPythonFunctionRunnerContext implements PythonFunctionRunnerContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can use DefaultExecutableStageContext directly in beam? And we can also remove the PythonFunctionRunnerContext interface.

* A receiver for the execution results of Python functions.
*/
@Internal
public interface PythonFunctionExecutionResultReceiver<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use FnDataReceiver in beam directly.

/**
* Prepares the Python function runner, such as preparing the Python execution environment, etc.
*/
void open() throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add these methods later? I am asking because these methods have been implemented in its child class however have not been covered in any tests.
Or can we add some tests to cover these methods?

Copy link
Contributor Author

@dianfu dianfu Sep 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favor of keeping them as they are very important part of the functionality of PythonFunctionRunner. The integration tests will be added in FLINK-14018. I will try to add some mock test in this PR.

@hequn8128
Copy link
Contributor

@dianfu I have finished my review now. Thank you very much for the nice PR.

I have not reviewed the last pr as there are related failures in the CI report. I will look at it once the CI passed. Is it ok with you?

Also, would be great if @sunjincheng121 can also take a look.

Best, Hequn

@dianfu
Copy link
Contributor Author

dianfu commented Sep 11, 2019

@hequn8128 Thanks a lot for your review. Very appreciated! Most comments are reasonable for me with just a few exceptions and I have explained under the comment. Please see if that makes sense to you. Thanks again!

@dianfu dianfu force-pushed the FLINK-14014 branch 2 times, most recently from fde6b67 to 6b47272 Compare September 12, 2019 04:37
Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dianfu Thanks a lot of the update. More comments for the third commit. I will take a look at the last commit right away.


for (int i = 0; i < row.getArity(); i++) {
if (!row.isNullAt(i)) {
fieldCoders[i].encode(TypeGetterSetters.get(row, i, fieldTypes[i]), outStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For BinaryRow, this may add redundant serialize/deserialize. We may add support for BaseRow in python side as well. The benefit includes:

  • Avoid redundant se/de.
  • User can use BaseRow in UDXs

However, this may need a big change. I think we can add BaseRow support later. Could we mark TODO here for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* A {@link Coder} for {@link BaseRow}.
*/
@Internal
public class BaseRowCoder extends Coder<BaseRow> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some tests for BaseRowCoder and RowCoder?

public void finishBundle() {
try {
remoteBundle.close();
} catch (Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add finally in this method and set remoteBundle to null.


@Override
public void close() throws Exception {
if (jobBundleFactory != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add finally in this method and set jobBundleFactory to null in finally.

@Override
public void processElement(IN element) {
try {
baos.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some log logic here, e.g., LOG.debug("Process element: {}", element)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think it is meaningless:

  1. For UDF, there may be multiple udfs in one operator, each udf may refer to different columns of the original inputs. It's difficult for users to determine which column printed here is corresponding to which column from the original input after all kinds of rules optimization.
  2. Seems that there is no built-in operator which logged the input elements information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, make sense.

* the {@link StageBundleFactory} which is the entry point to manage the Python execution environment.
*/
@Internal
public interface PythonFunctionRunnerContext extends AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this interface? It seems not be used.


@Override
public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest request) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw exceptions here(to inform that state request could not be handled correctly)? Maybe, we can use StateRequestHandler.unsupported() directly. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use StateRequestHandler.unsupported()

/**
* An {@link BundleProgressHandler} implementation which does nothing.
*/
private static class NoOpBundleProgressHandler implements BundleProgressHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this class. Use BundleProgressHandler.ignored() directly.

private static final String OUTPUT_CODER_ID = "output_coder";
private static final String WINDOW_CODER_ID = "window_coder";

private static final String WINDOW_STRATEGY = "windowing-strategy";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to windowing_strategy? Make it consistent with other names.

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dianfu Comments for the last commit. Find some missing dependencies for the NOTICE file. The licenses folder looks good.

NOTICE-binary Outdated
- com.fasterxml.jackson.core:jackson-databind:2.9.9
- com.google.api.grpc:proto-google-common-protos:1.12.0
- com.google.code.gson:gson:2.7
- com.google.guava:guava:11.0.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change 11.0.2 to 26.0-jre? The guava is introduced in org.apache.beam:beam-vendor-guava-26_0-jre

@@ -7986,16 +7986,102 @@ The Apache Software Foundation (http://www.apache.org/).
flink-python
Copyright 2014-2019 The Apache Software Foundation

This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we have missed the following dependencies under apache 2.0 license:

  • joda-time:joda-time:2.5
  • io.netty:netty-buffer:4.1.34.Final
  • io.netty:netty-codec:4.1.34.Final
  • io.netty:netty-codec-http:4.1.34.Final
  • io.netty:netty-codec-http2:4.1.34.Final
  • io.netty:netty-codec-socks:4.1.34.Final
  • io.netty:netty-common:4.1.34.Final
  • io.netty:netty-handler:4.1.34.Final
  • io.netty:netty-handler-proxy:4.1.34.Final
  • io.netty:netty-resolver:4.1.34.Final
  • io.netty:netty-transport:4.1.34.Final
  • io.netty:netty-transport-native-unix-common:4.1.34.Final

The io.netty dependencies are introduced by org.apache.beam:beam-vendor-grpc-1_21_0

@dianfu
Copy link
Contributor Author

dianfu commented Sep 17, 2019

@hequn8128 Thanks a lot for the review. I have updated the PR accordingly. Looking forward to your feedback.

@hequn8128
Copy link
Contributor

@dianfu Thanks a lot for the update. LGTM from my side.
Would be great if @sunjincheng121 can take another look.

@dianfu dianfu force-pushed the FLINK-14014 branch 3 times, most recently from 3bac78e to a087606 Compare September 17, 2019 09:29
Copy link
Contributor

@WeiZhong94 WeiZhong94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dianfu Thanks a lot for the PR. Great work! LGTM overall and only a minor comment.

public abstract Coder<OUT> getOutputCoder();

private String createEmptyRetrievalToken() throws Exception {
final File retrievalToken = Files.createTempFile("retrieval_token", "json").toFile();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this part is running in TaskManager, the better locations to store temporary files is getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(). And maybe we should delete it after finish bundle? At least we should call retrievalToken.deleteOnExit() to ensure that it will be deleted when jvm exits.

@dianfu
Copy link
Contributor Author

dianfu commented Sep 18, 2019

@WeiZhong94 Thanks for the review. Make sense and have updated the PR.

@WeiZhong94
Copy link
Contributor

@dianfu Thanks a lot for the update. LGTM and +1 to merge from my side.

…the communication with Python worker for Python ScalarFunction execution
@hequn8128
Copy link
Contributor

@dianfu Great job! Merging...

hequn8128 pushed a commit to hequn8128/flink that referenced this pull request Sep 18, 2019
…the communication with Python worker for Python ScalarFunction execution

* Introduces a series of PythonFunctionRunners. They are responsible for using
  Beam's portability framework for Python UDF execution.
* Handle the license statements as it introduces a few third-part dependences
  are introduced because of we depends on Beam.

This closes apache#9653
@asfgit asfgit closed this in cc3a27b Sep 18, 2019
@tillrohrmann
Copy link
Contributor

Please be a bit more careful when bumping build plugin versions. The introduced version 3.2.1 seems to fail the builds for our end-to-end tests: https://travis-ci.org/apache/flink/builds/591085384. When doing these kind of changes I would suggest to do it as a separate PR and to run at least every end-to-end test on Travis before merging.

@dianfu
Copy link
Contributor Author

dianfu commented Sep 30, 2019

@tillrohrmann Very appreciated for your suggestions. That makes sense to me. Will be more careful for this kind of changes next time, e.g. doing it in a separate PR and making sure end-to-end test passed.

@hequn8128
Copy link
Contributor

@tillrohrmann Thanks a lot for the notice. Will be more careful next time.

@dianfu dianfu deleted the FLINK-14014 branch June 10, 2020 03:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants