Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-22915][FLIP-173] Update Flink ML API to support AlgoOperator with multiple input tables and multiple output tables #4

Merged
merged 1 commit into from
Sep 26, 2021

Conversation

lindong28
Copy link
Member

What is the purpose of the change

This PR updates the Flink ML API according to FLIP-173.

Brief change log

This PR mades the following changes:

  • Added the AlgoOperator class. AlgoOperator class has the same interface as the existing Transformer (i.e. provides the transform(...) API).
  • Updated fit/transform methods to take list of tables as inputs and return list of tables as output.
  • Added setModelData and getModelData to the Model interface.
  • Removed the methods PipelineStage::toJson and PipelineStage::loadJson. Added methods save(...) and load(...) to the Stage interface.
  • Removed TableEnvironment from the parameter list of fit/transform APIs.
  • Added pipelineModel and let Pipeline implement only the Estimator. Pipeline is no longer a Transformer.
  • Removed Pipeline::appendStage from the Pipeline class.
  • Renamed PipelineStage to Stage and add the PublicEvolving tag to the Stage interface.

Verifying this change

This PR focuses on changing the Flink ML API. Some APIs such as save/load needs to be implemented after we complete the design of the parameter interface.

We will have followup PRs to implement all the Flink ML APIs and provide better test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (Java doc)

@lindong28
Copy link
Member Author

@becketqin Would you have time to review this PR?

Copy link
Member Author

@lindong28 lindong28 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhipeng93 Thanks for the review :)

Can we delay the review/code of the save/load and also related implementation of Pipeline/PipelineModel to a later PR? The code/test for these features are available in https://github.com/lindong28/flink-ml/tree/flink-ml if you want to take a look earlier to address the concern regarding e.g. lazy sink.

@zhipeng93
Copy link
Contributor

@zhipeng93 Thanks for the review :)

Can we delay the review/code of the save/load and also related implementation of Pipeline/PipelineModel to a later PR? The code/test for these features are available in https://github.com/lindong28/flink-ml/tree/flink-ml if you want to take a look earlier to address the concern regarding e.g. lazy sink.

@lindong28 Thanks :)
I checked the source code in the above link.
I am still curious and concerned about how save(String path) & load(String path) work in a distributed setting.
There are also some other related issues like failover if we are not using sink in Flink.

@lindong28
Copy link
Member Author

@zhipeng93 Sure. Can you explain the concern? And do you think these concern needs to be addressed in this PR?

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lindong28 Thanks for the patch. It looks that CI is not working properly for this repo. We probably need to fix that. I tried to build the project on my local machine after fixing the package path issue, but still failed in flink-ml-examples-streaming module. It looks we have Scala dependency there which should probably be removed completely. Have you tried build the project locally? Did it pass?

@lindong28
Copy link
Member Author

Thanks for the review @becketqin.

I did git clean -dfx && mvn package locally on my local machine after fixing the package path. It works well without reporting any flink-ml-examples-streaming failure. Can you try again after doing git clean -dfx?

Yes we can remove Scala dependency later. For now let's keep it because the Alink's infra used for KMeans (see https://github.com/zhipeng93/flink-ml/tree/alink) has Scala code.

@lindong28 lindong28 force-pushed the FLINK-22915 branch 2 times, most recently from 8c53d5b to 76fd1a5 Compare September 24, 2021 13:11
…ith multiple input tables and multiple output tables
@lindong28
Copy link
Member Author

@becketqin After looking into the package path issue, I think the CI is actually working correctly.

Here is what I find:

  1. I have tried mvn package with that package path locally and it works. So the CI is working as expected.

  2. Previously the file PipelineModel.java defines the PipelineModel class under the package path org.apache.flink.ml.api.pipeline. And all other classes (e.g.Pipeline) imports the PipelineModel class using the package path org.apache.flink.ml.api.pipeline. This could explain why mvn package works.

  3. What puzzled us is the fact that the class org.apache.flink.ml.api.pipeline.PipelineModel is defined in the file with path org/apache/flink/ml/api/core/PipelineModel.java. However, after searching related information on Google, I find no document that says the package path of a class must be consistent with the path of the file that defines the class. So that setup is technically OK.

@becketqin
Copy link
Contributor

Thanks for the patch. LGTM.

@becketqin becketqin merged commit 5ff346e into apache:master Sep 26, 2021
@lindong28 lindong28 deleted the FLINK-22915 branch September 26, 2021 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants