Skip to content

Conversation

@xuyang1706
Copy link
Contributor

What is the purpose of the change

Add an implementation of pipeline's api

Brief change log

  • Add an implement PipelineStage, Estimator, Transformer, Model.
  • Add MLSession to hold the execution environment and others session shared variable.
  • Add AlgoOperator for the implementation of algorithms.
  • Add BatchOperator and StreamOperator based on AlgoOperator
  • *Add TableSourceBatchOp and TableSourceStreamOp *

Verifying this change

This change added tests and can be verified as follows:

  • run test case pass

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 20, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 7505153 (Wed Oct 16 08:42:08 UTC 2019)

Warnings:

  • 1 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 20, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuyang1706 Thanks for the patch. I left some comments. In general it would avoid class name collision. We may also want to consider if the packaging can change a little bit. For example:

org.apache.flink.ml.
  .algooperator
      .AlgoOperator.java
      .stream
          .StreamAlgoOperator.java
          .source
               .TableSourceStreamAlgoOperator.java
      .batch
          .BatchAlgoOperator.java
          .source
               TableSourceBatchAlgoOperator.java

* @param <E> The class type of the {@link Estimator} implementation itself
* @param <M> class type of the {@link Model} this Estimator produces.
*/
public abstract class Estimator<E extends Estimator <E, M>, M extends Model <M>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in general an anti-pattern to have the duplicate class names. Can we change this to something like EstimatorBase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed to EstimatorBase.

import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
* Abstract class for a estimator that fit a {@link Model}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Java doc does not seem to provide much information to the code readers. How about change it to the following:

The base class for estimator implementations. It sets a global static context of `MLSession` and prepare the input of the estimator for either batch execution or stream execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed.

}

/**
* Train and produce a {@link Model} which fits the records in the given {@link Table}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to copy the java doc from the parent class if there is no additional information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

* @param input the table with records to train the Model.
* @return a model trained to fit on the given Table.
*/
public M fit(Table input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method have to be public? If it has to be public, this method has the assumption that the MLSession has been setup. Should this be mentioned in the JavaDoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to be public. We have refined the JavaDoc.

*/
@Override
public M fit(TableEnvironment tEnv, Table input) {
MLSession.setTableEnvironment(tEnv, input);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global static MLSession requires that the environment is only set once. I am not sure if this restriction is too strong. From ML pipeline API's perspective, there is no such restriction. At very least, we should document this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice and offline discussion, we defined MLEnvironmentFactory to replace the global static MLSession.

* @param <S> The class type of the {@link PipelineStage} implementation itself, used by {@link
* org.apache.flink.ml.api.misc.param.WithParams} and Cloneable.
*/
public abstract class PipelineStage<S extends PipelineStage <S>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, PipelineStageBase? This class does not implement the org.apache.flink.ml.api.core.PipelineStage, is this intended?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this class is a must. I think each type of pipeline stage individually can set its own base class. see: https://stackoverflow.com/a/3599379/11332462

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, PipelineStageBase? This class does not implement the org.apache.flink.ml.api.core.PipelineStage, is this intended?

I am not sure this class is a must. I think each type of pipeline stage individually can set its own base class. see: https://stackoverflow.com/a/3599379/11332462

It has been renamed and PipelineStage is the standard concept in pipeline and if it implements the WithParams , the subclasses could do not care about WithParams

* @param <T> The class type of the {@link Transformer} implementation itself, used by {@link
* org.apache.flink.ml.api.misc.param.WithParams}
*/
public abstract class Transformer<T extends Transformer <T>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, TransformerBase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, changed

/**
* Base class of streaming algorithm operators.
*/
public abstract class StreamOperator<T extends StreamOperator <T>> extends AlgoOperator<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name StreamOperator is colliding with org.apache.flink.streaming.api.operators.StreamOperator which is a widely established class in Flink. Can we change it to something like StreamAlgoOperator? We may need to change the BatchOperator to BatchAlgoOperator as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice and offline discussion. In our ML algorithm implementations and applications, we need not use org.apache.flink.streaming.api.operators.StreamOperator, and we'd like to use BatchOperator in ML lib.

return getOutput().toString();
}

public <S extends StreamOperator> S link(S next) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why link() and linkTo() methods require a template type while linkFrom() does not need that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

link() and linkTo() return the next AlgoOp, linkFrom() returns self AlgoOp.

import java.util.List;

/**
* Base class of streaming algorithm operators.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class seems needs some java doc for the public methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, added

Copy link

@ex00 ex00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @xuyang1706,
thanks for your work, I've just left a few comments, please look it then you will have time.

if (null != ins && ins.size() == 1) {
return linkFrom(ins.get(0));
} else {
throw new RuntimeException("Not support more than 1 inputs!");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementations of linkFrom(BatchOperator, BatchOperator), linkFrom(BatchOperator, BatchOperator,BatchOperator), linkFrom(List) looks inconsistent.
In methods for BatchOperators is created list of elements more that 1. but in result is throw exception that count elements should be 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We define the new method linkFrom(BatchOperator…) to replace them.

}

public T linkFrom(List <BatchOperator> ins) {
if (null != ins && ins.size() == 1) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is difference on this case from linkFrom(BatchOperator)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We define a new method linkFrom(BatchOperator…) to replace both of them.

@xuyang1706
Copy link
Contributor Author

xuyang1706 commented Sep 12, 2019

Hi @xuyang1706,
thanks for your work, I've just left a few comments, please look it then you will have time.

Hi @ex00 , thanks for your comments. We define the new method linkFrom(BatchOperator…), it can support one, two or many inputs.

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @xuyang1706 and sorry for joining the discussion late. Please see some of my comments. In general I think there are lots of added APIs that we should carefully document. Please let me know what you guys think ;-) thanks -Rong

* Base class for algorithm operators.
* @param <T> The class type of the {@link AlgoOperator} implementation itself
*/
public abstract class AlgoOperator<T extends AlgoOperator <T>> implements WithParams<T>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another question. based on what I understand of this PR. shouldn't it be

public abstract class AlgoOperator<T> extends PipelineStage<T> {
  // ...
}

return next;
}

public abstract T linkFrom(BatchOperator<?>... inputs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new abstract API. Please provide JavaDoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added intro and example.
For there is no answer area of the above question, I put the answer here:
PipelineStage only supports single input and single output, it is the basic unit for pipeline. AlgoOperator supports multi-input and multi-output. We’d like to implement the algorithm with AlgoOperator, and PipelineStage’s fit and transform function can call the AlgoOperator.

/**
* Base class of batch algorithm operators.
*/
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please provide java doc for link linkTo and linkFrom.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, provided.

}

public <B extends BatchOperator<?>> B linkTo(B next) {
next.linkFrom(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rational of making link in this direction? what if downstream operator calls linkFrom twice with the same upstream; or twice with different upstreams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the rational of making link in this direction? what if downstream operator calls linkFrom twice with the same upstream; or twice with different upstreams?

thx

  1. with this method, the operator can link from source to sink if every operator in the dag is single input and single output and it is straightforward that execute in order from front to back
  2. It is not recommended to linkFrom itself or link the same group inputs twice(added to javadoc) and the implement of the operator will define the behavior


@Override
public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs) {
throw new UnsupportedOperationException("Not supported.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more concise error message: "Table source operator should not have any upstream to link from"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this message is more concise.

* @param <S> The class type of the {@link PipelineStage} implementation itself, used by {@link
* org.apache.flink.ml.api.misc.param.WithParams} and Cloneable.
*/
public abstract class PipelineStage<S extends PipelineStage <S>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this class is a must. I think each type of pipeline stage individually can set its own base class. see: https://stackoverflow.com/a/3599379/11332462

…MLEnvironmentFactory that create the new MLEnvironment
@xuyang1706
Copy link
Contributor Author

@xuyang1706 Thanks for the patch. I left some comments. In general it would avoid class name collision. We may also want to consider if the packaging can change a little bit. For example:

org.apache.flink.ml.
  .algooperator
      .AlgoOperator.java
      .stream
          .StreamAlgoOperator.java
          .source
               .TableSourceStreamAlgoOperator.java
      .batch
          .BatchAlgoOperator.java
          .source
               TableSourceBatchAlgoOperator.java

@becketqin , thanks for your advice. we defined MLEnvironmentFactory to replace the global static MLSession and support to set multiple Environment, refactored the class names and refined the javaDoc.

@xuyang1706
Copy link
Contributor Author

Thanks for the contribution @xuyang1706 and sorry for joining the discussion late. Please see some of my comments. In general I think there are lots of added APIs that we should carefully document. Please let me know what you guys think ;-) thanks -Rong

@walterddr , thanks for your comments. We added more description on the concepts and APIs, and refactored the core functions of link, linkTo and linkFrom.

Copy link

@ex00 ex00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @xuyang1706 thanks for update,
I've left additional comments, please look they.

*/
public static AlgoOperator<?> sourceFrom(Table table) {
if (((TableImpl) table).getTableEnvironment() instanceof StreamTableEnvironment) {
return new TableSourceStreamOp(table);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cyclic dependency

org.apache.flink.ml.streamoperator.source.TableSourceStreamOp extends org.apache.flink.ml.streamoperator.StreamOperator

org.apache.flink.ml.streamoperator.StreamOperator<T extends StreamOperator<T>> extends org.apache.flink.ml.common.AlgoOperator

in org.apache.flink.ml.common.AlgoOperator import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed.

/**
* Construct the operator with empty Params.
*/
protected AlgoOperator() {
Copy link

@ex00 ex00 Sep 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably need only one default constructor with not nullable Table argument and nullable Params

big part of methods in this class depends on output field

public AlgoOperator(Params params, Table table) {
		if (null == table) {
			throw new IllegalArgumentException('Table shoud be non null');
		}
		this.output = table;

if (null == params) {
			this.params = new Params();
		} else {
			this.params = params.clone();
		}
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Table is the output of the operator. It should be set at linkFrom and call getOutput to get the output for the next linked operator in most of the scenes. So the AlgoOperator() and the AlgoOperator(Params params) should be exist.
The TableSourceBatchOp and the TableSourceStreamOp is the only two cases that construct the Operator using the table. These operators are the special cases and need not add a new constructor in the base class for these cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main point here is that since the public AlgoOperator() API is not used/tested. we can add it in from later PRs, correct me if I were wrong @ex00 ?
If that's the case then yes I do agree that any new APIs added which are not tested should be part of latter PRs.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes @walterddr, you are right. I don't see how using this constructor here.
Also, personally for me, logic in parent class, when using output field and it is not marked 'as required', is confusing a little bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In later PRs, these APIs will be used and tested.

For how using this constructor, I'd like to give an example.
SplitBatchOp is widely used in ML data pre-preprossing, which splits one dataset input 2 dataset: training set and validation set. It is very convenient for us to write code like this:
new SplitBatchOp().setSplitRatio(0.9)

For output field. The AlgoOperator may have one or more result tables, in most cases, it has only one result. The output is the main operation result table, and the other results are kept in the sideOutputs.
For example, 2 AlgoOperators: AlgoA and AlgoB, AlgoB takes the AlgoA’s results as its inputs, we can write the code like this:
AlgoB.linkFrom(AlgoA)
AlgoA.getOutput() provides the main result of AlgoA, and AlgoA.getSideOutputs() provides the other results of AlgoA. AlgoB will take the AlgoA’s results as its inputs by calling AlgoA.getOutput() and AlgoA.getSideOutputs().

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new SplitBatchOp().setSplitRatio(0.9)

How will it be apply to data source and how will define output result? Could you explain more please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we have the setOutput and can set the output in any method of the operator

Copy link

@ex00 ex00 Sep 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it mean what it will be look like new SplitBatchOp().setSplitRatio(0.9).setOutput(table) ?
if it is right assumption, that means that who will write code need to know that must call setOutput. And if someone will doing something from you example, it is easier to lost this call, and exception will be throw in run time only

AlgoA = new SplitBatchOp().setSplitRatio(0.9)
AlgoB.linkFrom(AlgoA)
AlgoA.getSchema()  // NPE: output is null

But if we define that SplitBatchOp(table) and developer don't thing about additional method calls and will get error on compile time in wrong case.
And as result in each operator we need to implement constructor with table param, why is not doing on parent class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there must be one or more data sources. We have defined some common *SourceBatchOp to get the source data conveniently, and from the sourceBatchOp, we can link the algorithm operations.
Here is a whole example:

CsvSourceBatchOp algoA =
	new CsvSourceBatchOp()
		.setFilePath("http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv")
		.setSchemaStr("s_length double, s_width double, p_length double, p_width double, category string");

System.out.println("schema of algoA:");
System.out.println(algoA.getSchema());

SplitBatchOp algoB = new SplitBatchOp().setFraction(0.8);

algoB.linkFrom(algoA);

System.out.println("schema of algoB's main result:");
System.out.println(algoB.getSchema());

System.out.println("schema of algoB's side output result:");
System.out.println(algoB.getSideOutput().getSchema());

And the print results:

schema of algoA:
root
 |-- s_length: Double
 |-- s_width: Double
 |-- p_length: Double
 |-- p_width: Double
 |-- category: String

schema of algoB's main result:
root
 |-- s_length: Double
 |-- s_width: Double
 |-- p_length: Double
 |-- p_width: Double
 |-- category: String

schema of algoB's side output result:
root
 |-- s_length: Double
 |-- s_width: Double
 |-- p_length: Double
 |-- p_width: Double
 |-- category: String

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now I see, thanks for the explanation

*/
public final class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {

public TableSourceBatchOp(Table table) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic could be moved to parent, it same as in TableSourceStreamOp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refer to #9184 (comment)

@Override
public Params getParams() {
if (null == this.params) {
this.params = new Params();
Copy link

@ex00 ex00 Sep 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate logic from constructor.
If we cant set params as null we don't need check all time it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice, removed the check process.

@xuyang1706
Copy link
Contributor Author

Hi @xuyang1706 thanks for update,
I've left additional comments, please look they.

@ex00 , thanks for your advice, I have refactored the sourceFrom() method and removed the unnecessary check process.

add more comments in MLEnvironmentFactory
move the initialize of MLEnvironment to getNewMLEnvironmentId in MLEnvironmentFactory
Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow up @xuyang1706 . Overall the changes looks good to me. I left some minor comments. Also I notices that some large trunk of code was missing tests? were we going to support individual tests in the future, or they are part of the MLSessionTest?

*
* <p>This class is extended to support the data transmission between the BatchOperators.
*/
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one suggestions I have is to put the operators into a single package
for example

|--operator
   |--AlgoOperator.java
   |--stream
      |--StreamOperator.java
      |--source
         |--TableSourceStreamOperator.java
   |--batch
      |--BatchOperator.java
      |--source
         |--TableSourceBatchOperator.java

IMO this is more clear than putting AlgoOperator in common. and others scatter around the package directory, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @becketqin , @walterddr , @ex00 , I have refactored the package paths according your suggestions.

this.streamEnv = ((StreamTableEnvironmentImpl) streamTableEnv).execEnv();
}
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would still put a check here

else if (tEnv instanceof BatchTableEnvironment) {
  //...
} else {
  throw new IllegalArgumentEception(...);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice, added.

* @param <E> The class type of the {@link EstimatorBase} implementation itself
* @param <M> class type of the {@link ModelBase} this Estimator produces.
*/
public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might've been wrong, but looks like EstimatorBase is not tested?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as other Pipeline stages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we have add some cases, and we will add more cases with the extended class.

/**
* Construct the operator with empty Params.
*/
protected AlgoOperator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main point here is that since the public AlgoOperator() API is not used/tested. we can add it in from later PRs, correct me if I were wrong @ex00 ?
If that's the case then yes I do agree that any new APIs added which are not tested should be part of latter PRs.

add more case
add more comments in AlgoOperator
@xuyang1706
Copy link
Contributor Author

Thanks for the follow up @xuyang1706 . Overall the changes looks good to me. I left some minor comments. Also I notices that some large trunk of code was missing tests? were we going to support individual tests in the future, or they are part of the MLSessionTest?

Thanks @walterddr , I refactored the package paths, added some test cases and refined the JavaDoc with examples.

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuyang1706 Thanks for updating the patch. I left a few more comments. Can you take a look?

/**
* Base class of batch algorithm operators.
*
* <p>This class is extended to support the data transmission between the BatchOperators.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class extends {@link AlgoOperator} to support data transmission between BatchOperators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

/**
* Abbreviation of {@link #linkTo(BatchOperator)}.
*/
public <B extends BatchOperator<?>> B link(B next) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to have this alias method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed the linkTo

* }
* </pre>
*
* <p>the <code>c</code> in upper code is the linked
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BatchOperator <code>c</code> in the above code is the same instance as <code>b</code> which takes <code>a</code> as its input. Note that BatchOperator <code>b</code> will be changed to link from BatchOperator <code>a</code>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

* <p>the <code>c</code> in upper code is the linked
* <code>b</code> which use <code>a</code> as input.
*
* @param next the linked BatchOperator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operator that will be modified to add this operator to its input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

/**
* Link to another {@link BatchOperator}.
*
* <p>Link the <code>next</code> to BatchOperator using this as its input.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link the <code>next</code> BatchOperator using this BatchOperator as its input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

* @param input the table with records to train the Model.
* @return a model trained to fit on the given Table.
*/
public M fit(Table input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method have to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this method is more convenient with the MLEnvironmentFactory, so I think it should be public

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually thinking that with the MLEnvironmentFactory whether we should replace void fit(TableEnvironment, Table) method with this one. That would make the API much consistent and easy to understand. Given that there is no users of the MLPipeline interface yet, doing it now looks the best timing. But we do need a FLIP for such API change.

import org.apache.flink.ml.params.shared.HasMLEnvironmentId;

/**
* The base class for a stage in a pipeline, either an [[Estimator]] or a [[Transformer]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the java doc format for the classes are not quite consistent. Sometimes it is Class, sometimes Class, and here it is [[Class]]. Can we make them consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

/**
* The base class for a stage in a pipeline, either an [[Estimator]] or a [[Transformer]].
*
* <p>Each pipeline stage is with parameters, and requires a public empty constructor for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PipelineStageBase maintains the parameters for the stage. A default constructor is needed in order to restore a pipeline stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

/**
* The base class for transformer implementations.
*
* @param <T> The class type of the {@link TransformerBase} implementation itself, used by {@link
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class type of the {@link TransformerBase} implementation itself =>

A subclass of {@link TransformerBase}, used by ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

/**
* Test cases for MLEnvironment.
*/
public class MLEnvironmentTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that there is no assertion in this test class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

@xuyang1706
Copy link
Contributor Author

xuyang1706 commented Sep 29, 2019

@xuyang1706 Thanks for updating the patch. I left a few more comments. Can you take a look?

Thanks @becketqin, I have refined the code according your comments.

Copy link
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuyang1706 Thanks for updating the patch. I left a few more comments. Most of them are minor. One major thing is I am not sure whether we should completely remove fit(TableEnvironment, Table) and transform(TableEnvironment, Table) API, because they are completely bypassed in the current implementation given the introduction of MLEnvironmentFactory.

public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;

/**
* A 'id' is a unique identifier of a MLEnvironment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A monotonically increasing id for the MLEnvironments. Each id uniquely identifies an MLEnvironment.
Nit: Maybe can change the variable name to nextId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

*/
public StreamExecutionEnvironment getStreamExecutionEnvironment() {
if (null == streamEnv) {
streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set the private instance variables in the constructor? This way all the instance variables can be final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, as we discussed offline, for depending an unfixed bug, we will use current implementation as a temp workaround.

if (mlEnvId.equals(DEFAULT_ML_ENVIRONMENT_ID)) {
setDefault(new MLEnvironment());
} else {
throw new IllegalArgumentException("There is no Environment in factory. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create the default ML Session in the static block? This will help simplify the logic here.

Should we also include the EnvId in the error message? Something like:
String.format("Cannot find MLEnvironment for MLEnvironmentId %s. Did you get the MLEnvironmentId by calling getNewMLEnvironmentId?", mlEnvId)`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice, we refactored code to create the default ML Session in the static block and changed the exception message.

*
* @param env the MLEnvironment
*/
public static synchronized void setDefault(MLEnvironment env) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is custom default MLEnvironment necessary? What if users used one default MLEnv and later on changed to another? This could introduce some unexpected problems.

Should users just create their own MLEnvironmentID and MLEnvironment in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have checked the existence of default MLEnv, user could only set one default MLEnv.

/**
* Returns the column names of the output table.
*/
public String[] getColNames() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious how would users get the schema of the side output tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added

+ size + ", current: " + inputs.length);
}

protected void checkMinOpSize(int size, StreamOperator<?>... inputs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

* @param input the table with records to train the Model.
* @return a model trained to fit on the given Table.
*/
public M fit(Table input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually thinking that with the MLEnvironmentFactory whether we should replace void fit(TableEnvironment, Table) method with this one. That would make the API much consistent and easy to understand. Given that there is no users of the MLPipeline interface yet, doing it now looks the best timing. But we do need a FLIP for such API change.

}

@Override
public Table transform(TableEnvironment tEnv, Table input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a sanity check to make sure the tEnv passed in is the same table environment used by the input table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with replace void fit(TableEnvironment, Table) method with the MLEnvironmentFactory, and I suggest we could discuss and change API in another PR.

}

@Override
public M fit(TableEnvironment tEnv, Table input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a sanity check to make sure the tEnv passed in is the same one used by the input table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto above.

Comment on lines 44 to 64
public void testConstructWithBatchEnv() {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(executionEnvironment);

MLEnvironment mlEnvironment = new MLEnvironment(executionEnvironment, batchTableEnvironment, null, null);

Assert.assertSame(mlEnvironment.getExecutionEnvironment(), executionEnvironment);
Assert.assertSame(mlEnvironment.getBatchTableEnvironment(), batchTableEnvironment);
}

@Test
public void testConstructWithStreamEnv() {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);

MLEnvironment mlEnvironment = new MLEnvironment(null, null, streamExecutionEnvironment, streamTableEnvironment);

Assert.assertSame(mlEnvironment.getStreamExecutionEnvironment(), streamExecutionEnvironment);
Assert.assertSame(mlEnvironment.getStreamTableEnvironment(), streamTableEnvironment);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only expect the MLEnvironment to be constructed in this two ways, maybe we can just have those two specific constructors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed

…ns in

operators to static, add the getter of side-outputs's schema
@xuyang1706
Copy link
Contributor Author

@xuyang1706 Thanks for updating the patch. I left a few more comments. Most of them are minor. One major thing is I am not sure whether we should completely remove fit(TableEnvironment, Table) and transform(TableEnvironment, Table) API, because they are completely bypassed in the current implementation given the introduction of MLEnvironmentFactory.

Thanks, @becketqin . I agree with replace void fit(TableEnvironment, Table) method with the MLEnvironmentFactory, and I suggest we could discuss and change the API in another PR.

@becketqin
Copy link
Contributor

@xuyang1706 Thanks for updating the patch. the patch LGTM overall. I created a PR with some minor improvements against your branch. Can you check if that makes sense? If so, feel free to merge it to your branch.

@walterddr Do you also want to take another look? I am thinking of merging the patch either on Sunday or Monday. Thanks.

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @xuyang1706 and the reviews @becketqin . Overall it looks good to me. I only left a minor comment. please kindly take a look.
+1 to merge to unblock future developments.

* @param mlEnvId the id.
* @return the removed MLEnvironment
*/
public static synchronized MLEnvironment remove(Long mlEnvId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this API be called? seems like it is not necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MLEnviromentFactory can create multi MLEnviroment, if some of them only used in a time slot, user can use this API to release the resource. In most cases, user just use default MLEnviroment. This API is rarely used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one minor bug here. The default MLEnvironment should never be removed. I'll fix this when checking in the code. Thanks.

@xuyang1706
Copy link
Contributor Author

Thanks for the contribution @xuyang1706 and the reviews @becketqin . Overall it looks good to me. I only left a minor comment. please kindly take a look.
+1 to merge to unblock future developments.

Thanks @walterddr. Yes, this API is rarely used, but in some cases, if user want to release no-used MLEnviroment in the process, this is only API could be called. Thus, I prefer to keep it.

@xuyang1706
Copy link
Contributor Author

@xuyang1706 Thanks for updating the patch. the patch LGTM overall. I created a PR with some minor improvements against your branch. Can you check if that makes sense? If so, feel free to merge it to your branch.

@walterddr Do you also want to take another look? I am thinking of merging the patch either on Sunday or Monday. Thanks.

Thanks for your help @becketqin. I have merged your "minor improvement PR" to my branch.

@becketqin
Copy link
Contributor

Merged to master.

@becketqin becketqin closed this Oct 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants