Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2157] [ml] Create evaluation framework for ML library #1849

Closed
wants to merge 1 commit into from

Conversation

thvasilo
Copy link

@thvasilo thvasilo commented Apr 4, 2016

Using this PR instead of #871 due to rebase issues.

@thvasilo
Copy link
Author

thvasilo commented Apr 4, 2016

@mbalassi @tillrohrmann

Closed the previous PR and opened this one for the evaluation framework, as I had some issues with rebasing.

@rawkintrevo
Copy link
Contributor

Are there going to be useage docs on this?

@rawkintrevo
Copy link
Contributor

Also two quick issues.

pipelines

val scaler = MinMaxScaler()
val pipeline = scaler.chainPredictor(mlr)
val evaluationDS = survivalLV.map(x => (x.vector, x.label))

pipeline.fit(survivalLV)
scorer.evaluate(evaluationDS, pipeline).collect().head

When using this with a ChainedPredictor as the predictor I get the following error:
error: could not find implicit value for parameter evaluateOperation: org.apache.flink.ml.pipeline.EvaluateDataSetOperation[org.apache.flink.ml.pipeline.ChainedPredictor[org.apache.flink.ml.preprocessing.MinMaxScaler,org.apache.flink.ml.regression.MultipleLinearRegression],(org.apache.flink.ml.math.Vector, Double),Double]

MinMaxScaler()
Merging for me broke the following code:

val scaler = MinMaxScaler()
val scaledSurvivalLV = scaler.transform(survivalLV)

With the following error (omiting part of the stack trace)
Caused by: java.lang.NoSuchMethodError: breeze.linalg.Vector$.scalarOf()Lbreeze/linalg/support/ScalarOf;
at org.apache.flink.ml.preprocessing.MinMaxScaler$$anonfun$3.apply(MinMaxScaler.scala:156)
at org.apache.flink.ml.preprocessing.MinMaxScaler$$anonfun$3.apply(MinMaxScaler.scala:154)
at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:584)
at org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver.collect(ChainedAllReduceDriver.java:93)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

I'm looking for a work around. Just saying I found a regression. Other than that, looks/works AWESOME well done.

@thvasilo
Copy link
Author

Hello Trevor,

Thanks for taking the time to look at this, I'll investigate these issues
today hopefully.

Sent from a mobile device. May contain autocorrect errors.
On Apr 21, 2016 12:16 AM, "Trevor Grant" notifications@github.com wrote:

Also two quick issues.

pipelines

val scaler = MinMaxScaler()val pipeline = scaler.chainPredictor(mlr)val evaluationDS = survivalLV.map(x => (x.vector, x.label))

pipeline.fit(survivalLV)
scorer.evaluate(evaluationDS, pipeline).collect().head

When using this with a ChainedPredictor as the predictor I get the
following error:
error: could not find implicit value for parameter evaluateOperation:
org.apache.flink.ml.pipeline.EvaluateDataSetOperation[org.apache.flink.ml.pipeline.ChainedPredictor[org.apache.flink.ml.preprocessing.MinMaxScaler,org.apache.flink.ml.regression.MultipleLinearRegression],(org.apache.flink.ml.math.Vector,
Double),Double]

MinMaxScaler()
Merging for me broke the following code:

val scaler = MinMaxScaler()val scaledSurvivalLV = scaler.transform(survivalLV)

With the following error (omiting part of the stack trace)
Caused by: java.lang.NoSuchMethodError:
breeze.linalg.Vector$.scalarOf()Lbreeze/linalg/support/ScalarOf;
at
org.apache.flink.ml.preprocessing.MinMaxScaler$$anonfun$3.apply(MinMaxScaler.scala:156)
at
org.apache.flink.ml.preprocessing.MinMaxScaler$$anonfun$3.apply(MinMaxScaler.scala:154)
at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:584)
at
org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver.collect(ChainedAllReduceDriver.java:93)
at
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

I'm looking for a work around. Just saying I found a regression. Other
than that, looks/works AWESOME well done.


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
#1849 (comment)

@rawkintrevo
Copy link
Contributor

rawkintrevo commented Apr 21, 2016

np, also RE: my comment on the docs- I think I can lend a hand there (I was actually testing functionality to make sure I understood how it worked). Let me know if I can be of assistance.

Also, I did some more hacking this morning...

import org.apache.flink.api.scala._

import org.apache.flink.ml.preprocessing.StandardScaler
val scaler = StandardScaler()//MinMaxScaler()

import org.apache.flink.ml.evaluation.{RegressionScores, Scorer}
val loss = RegressionScores.squaredLoss
val scorer = new Scorer(loss)

import org.apache.flink.ml.regression.MultipleLinearRegression
val mlr = MultipleLinearRegression()
                            .setIterations(10)
                            .setConvergenceThreshold(0.001)

val pipeline = scaler.chainPredictor(mlr)
val evaluationDS = survivalLV.map(x => (x.vector, x.label))

pipeline.fit(survivalLV)
//pipeline.evaluate(survivalLV).collect()
scorer.evaluate(evaluationDS, pipeline).collect().head

This throws the breeze.linalg... error. So I'm not sure exactly what is different, but it would seem the breeze.linalg is close to the heart of the problem(?) E.g. it is trying to use the pipeline, but still gets gigged in the scaler.

@thvasilo
Copy link
Author

thvasilo commented Apr 21, 2016

Well breeze was recently bumped to 0.12 #1876, maybe that has something to do with it, but let's see.

Any chance you can try with the prev. Breeze version?

Irrelevant as Breeze version still 0.11 in this branch.

@thvasilo
Copy link
Author

thvasilo commented Apr 21, 2016

I did some testing and I think the problem has to do with the types that each scaler expects.

StandardScaler has fit and transform operations for DataSets of type Vector, LabeledVector, and (T :< Vector, Double) while MinMaxScaler does not provide one for (T :< Vector, Double). If you add the operations the code runs fine (at least re. you first comment).

So this is a bug unrelated to this PR I think. The question becomes if we want to support all three of these types. My recommendation would be to have support for Vector and LabeledVector only, and remove all operations that work on (Vector, Double) tuples. I will file a JIRA for that.

There is an argument to be whether some pre-processing steps are supervised (e.g. PCA vs. LDA) but in the strict definition of a transformer we shouldn't care about the label, only the features, so that operation can implemented at the Transformer level.

@rawkintrevo
Copy link
Contributor

The transformer needs to scale the label too... I might not be correctly understanding your last paragraph / what you are proposing.

I agree with paragraph 1-3.

@gaborhermann
Copy link
Contributor

Hi all,

What is the status of this PR?
It would be relevant for us, because we might like to use the evaluation framework proposed here. See FLINK-4713 for details.

Can I do anything to help resolving the issues you've been discussing here?

@thvasilo
Copy link
Author

thvasilo commented Oct 4, 2016

@gaborhermann In terms of missing features, documentation is definitely missing, as @rawkintrevo mentioned.

For the issues mentioned in the JIRA issue you linked I've replied on the dev list thread you started, all valid points re. adjusting this to handle recommendations.

@skonto
Copy link

skonto commented Jan 17, 2017

Hey @thvasilo is this under development? From what I see many other tasks depend on it right?

*
* @tparam PredictionType output type
*/
trait Score[PredictionType] {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of having the scores independent of the models. For example each model could implement it's own score function within its implementation class. I may miss something here...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to reduce code duplication, many models can share the same evaluation infrastructure.

@thvasilo
Copy link
Author

Hello @skonto this PR will probably be subsumed by #2838, you can check out the latest development there.

@skonto
Copy link

skonto commented Jan 20, 2017

@thvasilo thnx I will have a look

@gaborhermann
Copy link
Contributor

Hi @skonto, I did not have time lately to finish up #2838, but I could clean it up next week. Although I believe this PR could be merged separately from mine. (Evaluating ranking recommendations is a bit more complicated.) As @thvasilo mentioned, the documentation is missing in his PR, but most of the work is already in place here. I could easily rebase my PR on top of this, if you don't modify much in the structure of classes. @thvasilo what do you think?

@thvasilo
Copy link
Author

Hello @gaborhermann. Personally I prefer to have PRs be as specific as possible, so I would recommend we try to get this merged before #2838, and then rebase that on master.

Given the committer load however this could take a while.

@skonto
Copy link

skonto commented Jan 20, 2017

Hi guys, my intention was to review #2838 but my feeling is that it overlaps with this one. @thvasilo we can push this one first as you said so I will have a look at it and comment on it. The benefit is unblocking other tasks in this area which rely on the framework.

@gaborhermann
Copy link
Contributor

Great. I agree this PR should be merged before #2838.
@skonto thanks for taking up the review :) This is indeed a bit blocking.
Hopefully I can improve upon #2838 next week, so by the time you get there, the PR could be ready (i.e. not in a WIP state).

@zentol
Copy link
Contributor

zentol commented Feb 28, 2019

Closing since flink-ml is effectively frozen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants