Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12473][ml] Add the interface of ML pipeline and ML lib #8402

Closed
wants to merge 3 commits into from
Closed

[FLINK-12473][ml] Add the interface of ML pipeline and ML lib #8402

wants to merge 3 commits into from

Conversation

ghost
Copy link

@ghost ghost commented May 10, 2019

What is the purpose of the change

This pull request introduces the major interfaces for ML pipeline and ML lib.

Brief change log

  • Create flink-ml module under flink root
  • Add flink-ml-api module, and introduce the major API interfaces, including PipelineStage, Estimator, Transformer, Model, Pipeline, etc.

Verifying this change

This change is pure interface design without any test coverage.

This PR only adds ML pipeline & ML lib interface. The implementations on these interface will be added in the next PR (please refer to “implementation plan” section of FLIP-39 doc where we will add the test cases.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented?
    • JavaDocs

    • Flink document (will submit via a separate PR)

@flinkbot
Copy link
Collaborator

flinkbot commented May 10, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

"Can not create a PipelineStage with a json without stageClassName signed");
}
try {
Persistable s = (Persistable) Class.forName(className).newInstance();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to supply a method which user can specify a {{ClassLoader}}, and use {{Class.forName(className, classLoader)}}, since class might only be found in other ClassLoader.

}

//find the last Estimator or Pipeline that needs fit in stages, -1 stand for no Estimator in Pipeline
private int getIndexOfLastEstimator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

travel in reverse order?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. should save some traverse time since it is also used in needFit() API.

Another way to think about it. since needFit() is used more often. we can make it

public boolean needFit() {
    stages.stream().filter(s ->isStageNeedFit(s)).findAny()
}

and just skip the use of last index in public Pipeline fit(Table input).

both approaches save time in traverse in case of long pipelines

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we eagerly check and record that in Pipeline appendStage(PipelineStage) with a bool member variable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's also a possibility. Just we need to carefully maintain the code to limit what API can alter the "pipeline" (e.g. each time pipeline changes, we need to update the state).

One more thinking on this is, we can always make the pipeline final, since every time we fit the pipeline, we create a new one. what do you think @c4emmmm

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renewed the Pipeline add left a comment. It's outside the code page so maybe you didn't see it. Sorry that i'm not familiar with github, maybe there's a better way to let you know.
The new Pipeline has only final fields, except lastEstimatorIndex which is an internal cache. Every stage must be added via appendStage() method including in constructor or fromJson() method. The return of getStages() will be an immutable copy.
These makes it impossible to modify the stages list other than append a new stage via appendStage(). So validation or finding out the last estimator can be done eagerly when a new stage is added. This should solve all your problems.

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @c4emmmm . The change looks good to me overall and it follows FLIP-39 pretty nicely. I just left some very detail comments on some default implementations. Please kindly take a look when you have time.

interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable,
Persistable {

default String toJson() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using Serialize / Deserialize with type settings ? I think many use cases require the flexibility to serialized the pipeline in a way that fits the client system need.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean just using these names or serializing to a byte array like most of serialization?
We hope the serialized String is human-readable and end-users can easily understand and edit it. This makes it possible to change some params and rerun a pipeline without recompiling the code. Json seems to be a standard and friendly way to express the params of a stage.
On the other hand, className is needed when restoring. It's now appended to the json with signWithClass() method in PipelineStageFactory. It's hard to provide such tool method if the serialized String is not a standard json map format.
That's why we explicitly use 'json' in the method name and why we use json as the serialize result. But we do wonder whether it's good to use json in serializing. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is also my confusion I think. let me rephrase it differently:

  1. if this is only for human readability, why do we need a toJson and fromJson? shouldn't the toJson be enough?
  2. I can see that parameter editing is a very tunable part for ML users. I am not expert but do we usually package param literals (e.g. the values) and models together? can we load them differently? e.g. load the param json and the model json separately.
  3. with my personal preference, if it can be a choice for user (json or yaml, or other) that would be even better.

I think one large question I have is whether we use this for serving purpose. if we are not using this for serving purpose. I am actually fine with Json or Yaml.

Copy link

@ex00 ex00 May 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to do it more flexible
If we would want to support other formats for e.g. PMML for serialization/deserialization, then toJson/fromJson doesn't match to logic in this case.

I suggest move serialization/deserialization implementation of logic to separate object from direct implementation in this interface - and inject serializer/deserializer to pipeline object.
I mean next

class ExampleTransformer implements Transformer<ExampleTransformer>{
   .... 
   public ExampleTransformer(PipelineSerializer implementationForSpecificFormat){
            this.serializer=implementationForSpecificFormat;
....
    }
....
   public String serialization() {    // renamed version of toJson()
            return this.serializer.serialize(this);
   }
..... //some approach with deserializer
}

or move out serialization and deserialization methods from Pipeline objects at all.

Transformer example = new ExampleTransformer();
//actions with ExampleTransformer
String serializedPipeline = new PipelineSerializer("supported_format_name_like_json").serialize(example);

I think this approach is more flexible for support different formats for serialization\deserialization pipelines and this the changes we can do in the next PR if Flink community will approve this proposal.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rongrong @ex00 @blublinsky
Thank you all for so much concern.

I think most of your questions can be concluded as three points:
1.Why use JSON rather than standard model languages like PMML? What if users want to use other format?
2.How to serve a model?

In fact the Persistable is designed to be used inside FlinkML only. It's not only the storage format but also a part of user interface, like SQL. We tried to design a SQL-like language to describe pipeline stages, but discarded since it's requiring users learn new things, which may be a disadvantage at the beginning of FlinkML. Though the json may be hard to write from zero, it is friendly enough to read and edit. We tried to support these separately but found that if the representation is defined reversible, it can meet both requirements.

It's also planned to support standard model languages. Let's take PMML as the example. Since it is not reversible once a model is converted to PMML, and may be hard to describe all kinds of pipeline stages including pipelines, we didn't choose it as the unified storage format, but to provide a PMMLExportable interface. Models can implement it then users could convert their trained model to a PMML-format string or file.
As for serving, it's supposed that Model.transform() does exactly what serving does. This means a user can easily serve their model trained by FlinkML, just save the model json then use it in his serving job. It's also possible to use the trained model outside Flink if the model implements PMMLExportable. We can also provide PMMLPredictor(extends Transformer) to serve an external PMML model. I also tried implement FLIP-23 proposed by blublinsky on top of this API. I put metadata of the updating model queue(like kafka) in the params then acquire the source in Model.transform(). Though the implementation is quite complex, it's universal and easy to use for users. We can do more to integrate FLIP-39 and FLIP-23 and provide better experience for users.

This is what we thought about, and I hope it solve your problems. I would appreciate you letting me know if you have any question or think there's a mistake.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the comments. It triggers a broad and deep thinking on this interface and the future plan to export/load the model in different formats. I have offline discussed this with @c4emmmm. The final plan looks good to me. @blublinsky, @walterddr, @ex00 , please let us know if you have any further question on the solution proposed by @c4emmmm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to expose more extendable interface in the next PR.

I am still not fully convince that "toJson" addresses all of the concerns that we raise, especially once @blublinsky raised. but I also have a strong feeling that this can be address in future PR.
One fundamental supporting factor I am on the "toJson" interface is that: a readable/editable format where a model can be bi-directionally converted in and out of FlinkML is 100% a need in multiple scenarios. for this PR purpose I don't think we need to focus on addressing "all" scenarios. what do you guys think?

IMO the follow up can be on "export", "serving", or even "standardizing" the model

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @walterddr. "toJson" interface makes it easier to read/edit a persisted pipeline. We can continue to discuss/work on the design of model export/load in the next PR. And it will make the discussion much easier when there is a PR for model export ready for review.
I can understand why the persistable interface introduces a lot confusions here. I talked with @c4emmmm offline again, and here are our decisions:

  1. delete persistable interface (it is a common interface, but currently only intend for the persistence of pipelineStage, which does not quite make sense), and leave the implementation of toJson and loadJson just in PipelineStage.
  2. delete the PipelineStageFactory interface as well as the stageMap in toJson and loadJson, as this contract is not ensured from interface.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks guys, this plan sounds good for me. Could you create subtask for it in FLINK-12470?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ex00 @walterddr @blublinsky
You may have noticed that we send a mail to flink-dev bringing up a discussion about export and import. Since we have discussed here in the pr, I think maybe you will be interested in the topic. We summarized some thoughts on this in this doc together with some examples:

https://docs.google.com/document/d/1B84b-1CvOXtwWQ6_tQyiaHwnSeiRqh-V96Or8uHqCp8/edit?usp=sharing

Please take a look if you have time.

}

//find the last Estimator or Pipeline that needs fit in stages, -1 stand for no Estimator in Pipeline
private int getIndexOfLastEstimator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. should save some traverse time since it is also used in needFit() API.

Another way to think about it. since needFit() is used more often. we can make it

public boolean needFit() {
    stages.stream().filter(s ->isStageNeedFit(s)).findAny()
}

and just skip the use of last index in public Pipeline fit(Table input).

both approaches save time in traverse in case of long pipelines

} else if (s instanceof Transformer) {
t = (Transformer) s;
} else {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we only check for RuntimeException before the LastEstimator ?
shouldn't this check be applied to all items ?

}

/**
* Returns the name of the parameter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to explain name needs to be unique within a pipeline (since it is used for the paramMap as key)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I will add to the javadoc.

@SuppressWarnings("unchecked")
public <V> V get(ParamInfo<V> info) {
V value = (V) paramMap.getOrDefault(info.getName(), info.getDefaultValue());
if (value == null && !info.isOptional() && info.getDefaultValue() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing from your previous hasDefaultValue() javadoc. if hasDefaultValue() is added and here defaultValue cannot be null if it is not optional. it seems like:

info.hasDefaultValue() === (!info.isOptional() && info.getDefaultValue() == null)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, null may be a valid default value. Some estimator may treat null value as a special case. It works like this: when getting a param which is not configured,
if hasDefaultValue, use defaultValue
if !hasDefaultValue && isOptional, use null
if !hasDefaultValue && !isOptional, throw exception, this means the params must be set manually
I will add @nullable at the setter of defaultValue. Or do you have any question about this behavior or this prerequisite(null can be valid param value)?

* Utility to restore a PipelineStage from a stage json.
*/
@PublicEvolving
public class MLStageFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just calling it PipelineStageFactory ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'll rename it.

Copy link

@ex00 ex00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks for your contribution. I left a few questions about the changes. Please take a look when you have time.


public Pipeline(List<PipelineStage> stages) {
this.stages = stages;
this.params = new Params();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will be using params in this class? Can we set params from arguments?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The params is currently useless, but only as the return of the getParams() method. All of the information the pipeline needs is in the PipelineStage list. Neither can we set the params of these stages via this params.
We did think about how to use this Params, like using it to store all params in this Params and sharing among these stages, but gave up for fare of the conflicting. We are still finding its usage.
Removing the WithParams interface from Pipeline means removing it from all basic stages, and some default implementation will be unable to provide. I prefer to maintain the status quo.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation.

* @return the aliases of the parameter
*/
public String[] getAlias() {
return new String[0];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to use a variable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is an omission. Since the constructors of ParamInfo are already chaos, I will refactor this with a new ParamInfoFactory class, and make the alias configurable.

public final class Pipeline implements Estimator<Pipeline, Pipeline>, Transformer<Pipeline>,
Model<Pipeline> {
private static final long serialVersionUID = 1L;
private List<PipelineStage> stages;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it final?

flink-ml/pom.xml Outdated
<relativePath>..</relativePath>
</parent>

<artifactId>flink-ml</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think naming this module as flink-ml might've conflict with the one in flink-library/flink-ml which is not a scala free project, thus travis complains. it might be a good idea to change it to some other naming ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments, @walterddr. Can you please suggest a proper name for this root module. I think flink-ml is the most appropriate one, and prefer to rename the legacy flink-ml (flink-library/flink-ml) module to flink-dataset-ml (I raised this discussion in ML, let us see what the others suggest). Or we can investigate if it is possible to eliminate the travis complains via changing the verification scripts to scan the scala package by obsolete package name: flink-library/flink-ml needs a scala suffix, while flink-ml does not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like the idea to change the current flink-ml to flink-dataset-ml or flink-mllib. If we were treating flink-ml like a first class citizen similar to how we moved flink-table out of the library submodule. I think the top level should take the most generic name of flink-ml.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea to rename flink-library/flink-ml to flink-library/flink-mllib. However, do we need to keep compatible for it ? (IMO, it is rarely used.)

Btw, we have the same problems with flink-python and flink-library/flink-python. @sunjincheng121

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's another module consisting of some Estimators and Transformers, which will be proposed after this PR. The module will be named 'flink-ml-lib', like the api is 'flink-ml-api'. If flink-library/flink-ml is renamed to flink-mllib, I'm afraid it'll be quite confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backward compatibility and version supporting seems reasonable. should we mark the current FlinkML as deprecated first before we merge?
also @wuchong makes a statement that I think we might need to do if we go down this path, we might also want to send a pooling mail to user/dev to make sure the community is OK for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started a discussion in dev/user-ml to collect the feedback. @zentol pointed out that we can just remove the legacy flink-ml package, we do not expect any further active development on this package. I think he is right. I will create another PR and delete it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to remove if no response in the user-ml. thanks for taking care of this @shaoxuan-wang .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to name this module flink-ml-parent; I always find it odd if the same artifactId is used for completely different modules in different versions of a project. I believe this is an unnecessary source for confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zentol agree, flink-ml-parent sounds good.

Copy link
Member

@bowenli86 bowenli86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for contributing this PR, and great to see new machine learning effort arriving at Flink!

We also need to create a licensing NOTICE file to declare bundled libs. According to my experience, the community prefers to have it at the first place in case we forget it which may delay the later release process. Please include that in this PR, or open a PR as immediate followup. You can take a look at https://cwiki.apache.org/confluence/display/FLINK/Licensing and use flink-connector-hive's NOTICE file as an example

<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gson is a pretty common lib that users have in their code. We may need to shade gson to avoid with conflict with user code. We can do it in this or another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I was actually also thinking whether we can use the already shaded jackson library in flink-shaded?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good not to introduce new dependency, I'll try to use jackson instead of gson.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When treating maps with flexible values(like the 'paramMap' in Params), jackson seems not as friendly as gson, which can get a JsonObject and parse each value with different class hint. Since the flink-ml code mostly runs at driver side only once rather than per record, ease of use is more important than efficiency and I'd like to shade gson and use it. Do you have any idea about better ways to use jackson? @walterddr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use Jackson. There is an extra cost to maintaining a new shaded package for the community. We need to put the shaded package to https://github.com/apache/flink-shaded and regularly update there.

For better ways to use, you can refer to JsonRowDeserializationSchema and JsonUtils.

}

//find the last Estimator or Pipeline that needs fit in stages, -1 stand for no Estimator in Pipeline
private int getIndexOfLastEstimator() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we eagerly check and record that in Pipeline appendStage(PipelineStage) with a bool member variable?

package org.apache.flink.ml.api.misc.persist;

/**
* An interface to allow PipelineStage persistence and reload. As of now, we are using JSON as
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
* An interface to allow PipelineStage persistence and reload. As of now, we are using JSON as
* An interface to allow PipelineStage persist and reload. As of now, we are using JSON as

@ghost
Copy link
Author

ghost commented May 13, 2019

Example libs and tests are pushed to a new branch called flink-ml-preview, including a kmeans estimator/model, two transformers and a simple kmeans test case. Though there's trouble running this test case, it should be helpful for understanding how to develop ml libs under flink-ml's core api and how the whole flink-ml works. Please check if necessary.

@ghost
Copy link
Author

ghost commented May 14, 2019

Thank @Xpray , @walterddr , @bowenli86 , @ex00 for your advice. I have pushed a new commit modifying Pipeline. The changes are:
Make stages final, and getStages() returns an immutable copy of the stages.
New pipeline with a pipelineStages list and restoring from json also use appendStage().
(These make the stages only editable via the appendStage() method.)
Validate and record index of last estimator in appendStage(), then all stages will be guaranteed valid and there'll be no need to find the index of last estimator many times.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @c4emmmm , and glad to see the great step of machine learning.

Overall looks good to me. I left some suggestion below.

* An interface to allow PipelineStage persistence and reload. As of now, we are using JSON as
* format.
*/
public interface Persistable {
Copy link
Member

@wuchong wuchong May 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have three concerns here.

  1. This interface forces the implementation class to have an empty constructor, and this is not mentioned in the javadoc.
  2. What's the JSON used for? Is it only used for restoring? If yes, does Java serialization satisfy this? If we can use Java serialization, we don't need the constructor limitation and the overhead to implement this interface.
  3. We need some tests to verify the implementation of toJson and loadJson works as expected.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice.
1.I'll add it to the javadoc.
2.Besides serialization, we hope there's a human-readable string to describe the pipeline/stage, and end-users can easily understand and edit it. This makes it possible to change some params and rerun a pipeline without recompiling the code. I believe such string is enough for serialization, and I choose to use json as the unified serialized string, which is standard and friendly.
But I'm also thinking whether it's good to use json, what do you think?
3.I'm working on adding tests, including this. It can be done today.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. It makes sense to me. I think it's fine to use JSON. Because it's readable and there are a lot of json visualization tools on web.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 thanks for the explanation @c4emmmm.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Human readability argument was used many times and it never quite work. Its fine to read a small model, but a large one?
I like Spark pipelines, which provides me with an API which I can use to get exactly the info that I need instead of sifting through a huge JSON string.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blublinsky
Sorry that I went through SparkML but didn't find exactly the api you mean. Since I never really used Spark ML, I'm afraid I missed something. Could you please tell me what you are indicating?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overall proposal is very similar to https://spark.apache.org/docs/latest/ml-pipeline.html.
and https://spark.apache.org/docs/latest/ml-pipeline.html#how-it-works
There is also a set of predefined, commonly used transformers https://spark.apache.org/docs/latest/ml-features.html

What I was referring to is something like this:

val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explain. I found this example code in SparkML. It’s mainly use toDebugString method to acquire the info. But since I read the code, I found that the toDebugString method is provided only by a few models, most of which are tree models.

Here are all models I found that has this function or something like it:
DecisionTreeClassificationModel,
DecisionTreeRegressionModel,
RandomForest,
DecisionTreeModel,
GBTClassificationModel,
GBTRegressionModel

and an example debug string return by this method is like this:
RandomForestClassificationModel (uid=rfc_6c4ceb92ba78) with 20 trees
Tree 0 (weight 1.0):
If (feature 0 <= 3="" 10="" 1.0)="" if="" (feature="" <="0.0)" predict:="" 0.0="" else=""> 6.0)
Predict: 0.0
Else (feature 10 > 0.0)
If (feature 12 <= 12="" 63.0)="" predict:="" 0.0="" else="" (feature=""> 63.0)
Predict: 0.0
Else (feature 0 > 1.0)
If (feature 13 <= 3="" 1.0)="" if="" (feature="" <="3.0)" predict:="" 0.0="" else=""> 3.0)
Predict: 1.0
Else (feature 13 > 1.0)
If (feature 7 <= 7="" 1.0)="" predict:="" 0.0="" else="" (feature=""> 1.0)
Predict: 0.0
Tree 1 ... (repeat count of trees times with different weights)

The requirement to acquire needed information from the pipelines is indeed valuable. If estimators or models could rephrase their params with a more friendly description, it would be easier to understand. I think this is the value that the toDebugString mainly provides. Maybe we should stipulate that all models have friendly and well-defined toString() method, or even add a describe() method in the basic interfaces like PipelineStage.

But it's not what we should do with the json we discussing. The json is designed to completely describe a pipeline, and should be able to generate from and convert to a pipeline without information loss, which allows it as a storage format. The readability is mostly for users editing a pipeline without editing the code and recompiling it. This makes it more convenient to tune or reuse pipeline.

Another question is about initial transformers. It's already planned that next PR is some initial estimators, models and transformers after the api is accepted. They would have well defined toString method to provide enough information a user needs. More will be added since then, and hopefully even more libraries could be contributed to enrich the Flink ML.

flink-ml/pom.xml Outdated
<relativePath>..</relativePath>
</parent>

<artifactId>flink-ml</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea to rename flink-library/flink-ml to flink-library/flink-mllib. However, do we need to keep compatible for it ? (IMO, it is rarely used.)

Btw, we have the same problems with flink-python and flink-library/flink-python. @sunjincheng121

public static <T extends Persistable> T createFromJson(String json) {
Gson gson = new Gson();
JsonObject jobj = gson.fromJson(json, JsonObject.class);
String className = jobj.get("stageClassName").getAsString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to check instantiation for the Class to throw an explicit exception. We can call InstantiationUtil.checkForInstantiation here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, i'll take it

<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use Jackson. There is an extra cost to maintaining a new shaded package for the community. We need to put the shaded package to https://github.com/apache/flink-shaded and regularly update there.

For better ways to use, you can refer to JsonRowDeserializationSchema and JsonUtils.

@ghost
Copy link
Author

ghost commented May 16, 2019

Test cases are added, including the behavior of pipeline and params, serializing and restoring a pipeline, and the ExtractParamInfoUtils.

* <p>If there is no {@link Estimator} in the pipeline, the method returns a copy of this pipeline.
*/
public Pipeline fit(Table input) {
List<PipelineStage> transformStages = new ArrayList<>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to set stages.size(); as initial capacity for transformStages list

/**
* Utility to extract all ParamInfos defined in a WithParams, mainly used in persistence.
*/
public class ExtractParamInfosUtil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you can make it as final class.

try {
result.add((ParamInfo) f.get(s));
} catch (IllegalAccessException e) {
System.err.println("Failed to extract field:" + f.getName() + ", ignore it.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose to use logger for logging error here.

/**
* Interface to describe a class with a string, only for pipeline test.
*/
public interface SelfDescribe {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, that all implementations of this interface are stubs - not mock, because you have implemented some logic for check behavior. I may be mistaken, please correct me

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the difference between stub and mock. Let me find it out first.

Other advices seem reasonable. I'll take them. Thanks.

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update and prompt feedbacks @c4emmmm . I've reply to some of the comments and will give it another pass during the weekend. :-)

interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable,
Persistable {

default String toJson() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is also my confusion I think. let me rephrase it differently:

  1. if this is only for human readability, why do we need a toJson and fromJson? shouldn't the toJson be enough?
  2. I can see that parameter editing is a very tunable part for ML users. I am not expert but do we usually package param literals (e.g. the values) and models together? can we load them differently? e.g. load the param json and the model json separately.
  3. with my personal preference, if it can be a choice for user (json or yaml, or other) that would be even better.

I think one large question I have is whether we use this for serving purpose. if we are not using this for serving purpose. I am actually fine with Json or Yaml.

* An interface to allow PipelineStage persistence and reload. As of now, we are using JSON as
* format.
*/
public interface Persistable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 thanks for the explanation @c4emmmm.

flink-ml/pom.xml Outdated
<relativePath>..</relativePath>
</parent>

<artifactId>flink-ml</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backward compatibility and version supporting seems reasonable. should we mark the current FlinkML as deprecated first before we merge?
also @wuchong makes a statement that I think we might need to do if we go down this path, we might also want to send a pooling mail to user/dev to make sure the community is OK for this.

@shaoxuan-wang shaoxuan-wang self-assigned this May 21, 2019
@ghost
Copy link
Author

ghost commented May 22, 2019

As mentioned in the mail "[DISCUSS] FLIP-39: Flink ML pipeline and ML libs" in flink-dev, we have to add a TableEnvironment as input argument in Estimator.fit() and Transformer.transform().

This is because it's necessary to register a function, and may use to judge whether the pipeline is running in a batch or streaming environment since some algorithm only support batch training. While it is impossible to get the TableEnvironment via the only input Table without a flink-table-planner dependency, which should not be touched by users, so we have to add this as input.

Another way is to get the TableEnvironment via a static method, but it is not safe since users may create a TableEnvironment without using this method, and cause unknown problems.

Adding the input argument should not change much the experience of users and library developers. Please let me know if anyone has questions about this.

@ghost
Copy link
Author

ghost commented May 22, 2019

Thanks for contributing this PR, and great to see new machine learning effort arriving at Flink!

We also need to create a licensing NOTICE file to declare bundled libs. According to my experience, the community prefers to have it at the first place in case we forget it which may delay the later release process. Please include that in this PR, or open a PR as immediate followup. You can take a look at https://cwiki.apache.org/confluence/display/FLINK/Licensing and use flink-connector-hive's NOTICE file as an example

@bowenli86
Thanks for your notice. According to the advice to replace gson with flink-shaded-jackson, I have refactored the code and changed the dependency. Since now flink-ml-api only depends on "flink-table-api-java" and "flink-shaded-jackson", I think we no longer need the NOTICE file. Please let me know if I'm mistaken.

@shaoxuan-wang
Copy link
Contributor

Thanks for the contribution, @c4emmmm. The entire design looks good to me. I have gone over all the comments. It seems most of them have been addressed. There is one major comment about the way to persist and reload the mode. I noticed that you have responded to the reviewers, while they have not yet confirmed if they are happy with your solution or not.
In general I am +1 on this PR. Will merge it in the next 1-2 days if there is no further comments/concerns coming out.

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the update and feedback explanations @c4emmmm @shaoxuan-wang . sorry for taking too long to circle back on this. Overall I am +1 to get this in and address some of the comments in future PRs.

interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable,
Persistable {

default String toJson() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to expose more extendable interface in the next PR.

I am still not fully convince that "toJson" addresses all of the concerns that we raise, especially once @blublinsky raised. but I also have a strong feeling that this can be address in future PR.
One fundamental supporting factor I am on the "toJson" interface is that: a readable/editable format where a model can be bi-directionally converted in and out of FlinkML is 100% a need in multiple scenarios. for this PR purpose I don't think we need to focus on addressing "all" scenarios. what do you guys think?

IMO the follow up can be on "export", "serving", or even "standardizing" the model

flink-ml/pom.xml Outdated
<relativePath>..</relativePath>
</parent>

<artifactId>flink-ml</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to remove if no response in the user-ml. thanks for taking care of this @shaoxuan-wang .

@sunjincheng121
Copy link
Member

@flinkbot approve-until architecture

@sunjincheng121
Copy link
Member

The CI was failed, I restarted it!

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the jump into the review! And great thanks for your effort on flink-ml! @c4emmmm
I am not good at this part, just left a few comments about Java DOC. And two suggestions:


<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml-parent</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about named flink-machineLearning or flink-ml, due to we already want to delete the libraries/flink-ml in this discussion thread? http://mail-archives.apache.org/mod_mbox/flink-dev/201905.mbox/%3CCAAjCYUXy61SEouWVpX5nTJ6G2fwT76RAVtAOOf2XSVMHtzb_SQ@mail.gmail.com%3E

import org.junit.rules.ExpectedException;

/**
* Tests for pipeline behavior.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests the behavior of {@link Pipeline}.

/**
* Test for the behavior and validator of Params.
*/
public class ParamsTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for the behavior and validator of {@link Params}.

import org.junit.rules.ExpectedException;

/**
* Test for the behavior and validator of Params.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for the behavior and validator of {@link Params}.

import java.util.List;

/**
* Test for ExtractParamInfosUtil.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for {@link ExtractParamInfosUtil}.

*/
public interface InterfaceWithParamInfo<T extends InterfaceWithParamInfo>
extends WithParams<T> {
ParamInfo<String> KI = ParamInfoFactory.createParamInfo("KI", String.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KI is never used, right? can we remove it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KI is defined in an interface, which means it's 'public static final' implicitly, and final variables must have initial value, so the assignment can not be omitted.
And the util is used to find out all ParamInfos defined in a PipelineStage and its superclass and implemented interfaces. The KI is defined here to help testing the acquisition from interfaces so itself can not be removed as well.

package org.apache.flink.ml.api.misc.param;

/**
* Factory to create ParamInfo, all ParamInfos should be create via this class.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be create -> be created?

@ghost
Copy link
Author

ghost commented May 24, 2019

@sunjincheng121
Thanks for your review and the reminder to add flink-ml into flink-dist, I'll do it in later PRs. And I'll take the suggestions in javaDocs.
As for the module name, I think it's correct that we should avoid confusing users with using same name indicating completely different modules even in different versions. But I'm also afraid that 'flink-ml-parent' is not a proper name. I'm not sure but maybe we should have only one 'parent' in flink. I'd like to ask more ideas before I change the module name again.

@shaoxuan-wang
Copy link
Contributor

@sunjincheng121 @c4emmmm , I think it is OK to use parent as suffix. We already have flink-test-utils-parent module.

@shaoxuan-wang
Copy link
Contributor

@flinkbot approve all

shaoxuan-wang pushed a commit to shaoxuan-wang/flink that referenced this pull request May 24, 2019
@asfgit asfgit closed this in 3050957 May 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet