From 305b43a451af3d8bc859671476c215308fbfc7fc Mon Sep 17 00:00:00 2001 From: mikiobraun Date: Mon, 22 Jun 2015 17:04:42 +0200 Subject: [PATCH 01/16] Adding some first loss functions for the evaluation framework --- .../org/apache/flink/ml/evaluation/Loss.scala | 67 ++++++++++++++++ .../scala/org/apache/flink/ml/package.scala | 7 +- .../apache/flink/ml/evaluation/LossTest.scala | 77 +++++++++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala create mode 100644 flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/LossTest.scala diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala new file mode 100644 index 0000000000000..babae76ba6c0f --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala @@ -0,0 +1,67 @@ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Loss function + * + * Takes a hole data set and then computes the score on them (obviously, again encoded in a DataSet) + * + * @tparam Y output type + */ +trait Loss[Y] { + def loss(trueAndPredicted: DataSet[(Y, Y)]): DataSet[Double] +} + +/** + * Loss functions expressible as a mean of a function taking output pairs as input + * + * @param lossFct function to apply to all elements + * @tparam Y output type + */ +class MeanLoss[Y: TypeInformation: ClassTag](lossFct: (Y, Y) => Double) + (implicit yyt: TypeInformation[(Y, Y)]) +extends Loss[Y] with Serializable { + def loss(trueAndPredicted: DataSet[(Y, Y)]): DataSet[Double] = + trueAndPredicted.map(yy => lossFct(yy._1, yy._2)).mean() +} + + +object Loss { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanLoss[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) + + /** + * Zero One Loss Function + * + * returns 1 if outputs differ and 0 if they are equal + * + * @tparam T output type + * @return a Loss object + */ + def zeroOneLoss[T: TypeInformation: ClassTag] = new MeanLoss[T]((y1, y2) => if (y1 == y2) 0 else 1) + + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanLoss[Double]({ (y1, y2) => + val sy1 = scala.math.signum(y1) + val sy2 = scala.math.signum(y2) + if (sy1 == sy2) 0 else 1 + }) + +} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala index 554e155201045..5d73ab47517f8 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala @@ -21,7 +21,7 @@ package org.apache.flink import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.DataSink -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.ml.common.LabeledVector @@ -70,6 +70,11 @@ package object ml { dataSet.map(new BroadcastSingleElementMapperWithIteration[T, B, O](dataSet.clean(fun))) .withBroadcastSet(broadcastVariable, "broadcastVariable") } + + def mean()(implicit num: Numeric[T], ttit: TypeInformation[(T, Int)]): DataSet[Double] = + dataSet.map(x => (x, 1)) + .reduce((xc, yc) => (num.plus(xc._1, yc._1), xc._2 + yc._2)) + .map(xc => num.toDouble(xc._1) / xc._2) } private class BroadcastSingleElementMapper[T, B, O]( diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/LossTest.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/LossTest.scala new file mode 100644 index 0000000000000..f78ac0c56f6b4 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/LossTest.scala @@ -0,0 +1,77 @@ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.data.ToyData +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.ml.regression.SimpleLeastSquaresRegression +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + + +class LossTest + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Loss Functions" + + it should "work for squared loss" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val yy = env.fromCollection(Seq((0.0, 1.0), (0.0, 0.0), (3.0, 5.0))) + + val loss = Loss.squaredLoss + + val result = loss.loss(yy).collect() + + result.length shouldBe 1 + result.head shouldBe (1.6666666666 +- 1e-4) + } + + it should "work for zero one loss" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val yy = env.fromCollection(Seq("a" -> "a", "a" -> "b", "b" -> "c", "d" -> "d")) + + val loss = Loss.zeroOneLoss[String] + + val result = loss.loss(yy).collect() + + result.length shouldBe 1 + result.head shouldBe (0.5 +- 1e9) + } + + it should "work for zero one loss applied to signs" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val yy = env.fromCollection(Seq[(Double,Double)](-2.3 -> 2.3, -1.0 -> -10.5, 2.0 -> 3.0, 4.0 -> -5.0)) + + val loss = Loss.zeroOneSignumLoss + + val result = loss.loss(yy).collect() + + result.length shouldBe 1 + result.head shouldBe (0.5 +- 1e9) + } + + it should "work with a slightly more involved case with linear regression" in { + val env = ExecutionEnvironment.getExecutionEnvironment + val center = DenseVector(1.0, -2.0, 3.0, -4.0, 5.0) + val weights = DenseVector(2.0, 1.0, 0.0, -1.0, -2.0) + val n = 1000 + val noise = 0.5 + val ds = env.fromCollection(ToyData.singleGaussianLinearProblem(n, center, weights, noise)) + + val slr = new SimpleLeastSquaresRegression + slr.fit(ds) + + val test = ds.map(x => (x.vector, x.label)) + + val labels = slr.evaluate(test) + + val error = Loss.squaredLoss.loss(labels) + val expectedError = noise*noise + + error.collect().head shouldBe (expectedError +- expectedError/5) + } +} From bdb1a6912d2bcec29446ca4a9fbc550f2ecb8f4a Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 23 Jun 2015 16:07:48 +0200 Subject: [PATCH 02/16] Scorer for evaluation --- .../org/apache/flink/ml/evaluation/Loss.scala | 67 ------------- .../apache/flink/ml/evaluation/Score.scala | 98 +++++++++++++++++++ .../apache/flink/ml/evaluation/Scorer.scala | 44 +++++++++ .../apache/flink/ml/pipeline/Predictor.scala | 11 +-- .../{LossTest.scala => ScoreTest.scala} | 40 ++++++-- .../flink/ml/evaluation/ScorerITSuite.scala | 57 +++++++++++ 6 files changed, 234 insertions(+), 83 deletions(-) delete mode 100644 flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala create mode 100644 flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala create mode 100644 flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala rename flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/{LossTest.scala => ScoreTest.scala} (58%) create mode 100644 flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala deleted file mode 100644 index babae76ba6c0f..0000000000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Loss.scala +++ /dev/null @@ -1,67 +0,0 @@ -package org.apache.flink.ml.evaluation - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.ml._ - -import scala.reflect.ClassTag - -/** - * Loss function - * - * Takes a hole data set and then computes the score on them (obviously, again encoded in a DataSet) - * - * @tparam Y output type - */ -trait Loss[Y] { - def loss(trueAndPredicted: DataSet[(Y, Y)]): DataSet[Double] -} - -/** - * Loss functions expressible as a mean of a function taking output pairs as input - * - * @param lossFct function to apply to all elements - * @tparam Y output type - */ -class MeanLoss[Y: TypeInformation: ClassTag](lossFct: (Y, Y) => Double) - (implicit yyt: TypeInformation[(Y, Y)]) -extends Loss[Y] with Serializable { - def loss(trueAndPredicted: DataSet[(Y, Y)]): DataSet[Double] = - trueAndPredicted.map(yy => lossFct(yy._1, yy._2)).mean() -} - - -object Loss { - /** - * Squared loss function - * - * returns (y1 - y2)' - * - * @return a Loss object - */ - def squaredLoss = new MeanLoss[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) - - /** - * Zero One Loss Function - * - * returns 1 if outputs differ and 0 if they are equal - * - * @tparam T output type - * @return a Loss object - */ - def zeroOneLoss[T: TypeInformation: ClassTag] = new MeanLoss[T]((y1, y2) => if (y1 == y2) 0 else 1) - - /** - * Zero One Loss Function also usable for score information - * - * returns 1 if sign of outputs differ and 0 if the signs are equal - * - * @return a Loss object - */ - def zeroOneSignumLoss = new MeanLoss[Double]({ (y1, y2) => - val sy1 = scala.math.signum(y1) - val sy2 = scala.math.signum(y2) - if (sy1 == sy2) 0 else 1 - }) - -} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala new file mode 100644 index 0000000000000..887293ec4a77f --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ + +import scala.reflect.ClassTag + +/** + * Evaluation score + * + * Takes a whole data set and then computes the evaluation score on them (obviously, again encoded + * in a DataSet) + * + * @tparam PredictionType output type + */ +trait Score[PredictionType] { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] +} + +/** Traits to allow us to determine at runtime if a Score is a loss (lower is better) or a + * performance score (higher is better) + */ +trait Loss + +trait PerformanceScore + +/** + * Metrics expressible as a mean of a function taking output pairs as input + * + * @param scoringFct function to apply to all elements + * @tparam PredictionType output type + */ +class MeanScore[PredictionType: TypeInformation: ClassTag]( + scoringFct: (PredictionType, PredictionType) => Double) + (implicit yyt: TypeInformation[(PredictionType, PredictionType)]) + extends Score[PredictionType] with Serializable { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = + trueAndPredicted.map(yy => scoringFct(yy._1, yy._2)).mean() +} + + + +object Score { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanScore[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) with Loss + + /** + * Zero One Loss Function + * + * returns 1 if outputs differ and 0 if they are equal + * + * @tparam T output type + * @return a Loss object + */ + def zeroOneLoss[T: TypeInformation: ClassTag] = { + //TODO: If T == Double, == comparison could be problematic + new MeanScore[T]((y1, y2) => if (y1 == y2) 0 else 1) with PerformanceScore + } + + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) => + val sy1 = scala.math.signum(y1) + val sy2 = scala.math.signum(y2) + if (sy1 == sy2) 0 else 1 + }) with PerformanceScore + +} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala new file mode 100644 index 0000000000000..db41fa7e7065e --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import org.apache.flink.ml.pipeline.Predictor + +import org.apache.flink.ml.pipeline.EvaluateDataSetOperation + +//TODO: Need to generalize type of Score (and evaluateOperation) +class Scorer(val metric: Score[Double]) extends WithParameters { + + def evaluate[Testing, PredictorInstance<: Predictor[PredictorInstance]]( + testing: DataSet[Testing], + predictorInstance: PredictorInstance, + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluateOperation: EvaluateDataSetOperation[PredictorInstance, Testing, Double]): + DataSet[Double] = { + + FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) + val resultingParameters = predictorInstance.parameters ++ evaluateParameters + val predictions = predictorInstance. + evaluate[Testing, Double](testing, resultingParameters) + //TODO: Use parameters + metric.evaluate(predictions) + } + +} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index 9d11cff9e933c..d0a21f9f81eef 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -72,8 +72,8 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { */ def evaluate[Testing, PredictionValue]( testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) + evaluateParameters: ParameterMap = ParameterMap.Empty) + (implicit evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) : DataSet[(PredictionValue, PredictionValue)] = { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) @@ -174,7 +174,7 @@ object Predictor { } } -/** Type class for the predict operation of [[Predictor]]. This predict operation works on DataSets. +/** Trait for the predict operation of [[Predictor]]. This predict operation works on DataSets. * * [[Predictor]]s either have to implement this trait or the [[PredictOperation]] trait. The * implementation has to be made available as an implicit value or function in the scope of @@ -233,11 +233,10 @@ trait PredictOperation[Instance, Model, Testing, Prediction] extends Serializabl * @param model The model representation of the prediciton algorithm * @return A label for the provided example of type [[Prediction]] */ - def predict(value: Testing, model: Model): - Prediction + def predict(value: Testing, model: Model): Prediction } -/** Type class for the evaluate operation of [[Predictor]]. This evaluate operation works on +/** Trait for the evaluate operation of [[Predictor]]. This evaluate operation works on * DataSets. * * It takes a [[DataSet]] of some type. For each element of this [[DataSet]] the evaluate method diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/LossTest.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala similarity index 58% rename from flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/LossTest.scala rename to flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala index f78ac0c56f6b4..949cca29fa9f3 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/LossTest.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.flink.ml.evaluation import org.apache.flink.api.scala._ @@ -8,21 +27,21 @@ import org.apache.flink.test.util.FlinkTestBase import org.scalatest.{FlatSpec, Matchers} -class LossTest +class ScoreTest extends FlatSpec with Matchers with FlinkTestBase { - behavior of "Loss Functions" + behavior of "Evaluation Score functions" it should "work for squared loss" in { val env = ExecutionEnvironment.getExecutionEnvironment val yy = env.fromCollection(Seq((0.0, 1.0), (0.0, 0.0), (3.0, 5.0))) - val loss = Loss.squaredLoss + val loss = Score.squaredLoss - val result = loss.loss(yy).collect() + val result = loss.evaluate(yy).collect() result.length shouldBe 1 result.head shouldBe (1.6666666666 +- 1e-4) @@ -33,9 +52,9 @@ class LossTest val yy = env.fromCollection(Seq("a" -> "a", "a" -> "b", "b" -> "c", "d" -> "d")) - val loss = Loss.zeroOneLoss[String] + val loss = Score.zeroOneLoss[String] - val result = loss.loss(yy).collect() + val result = loss.evaluate(yy).collect() result.length shouldBe 1 result.head shouldBe (0.5 +- 1e9) @@ -44,11 +63,12 @@ class LossTest it should "work for zero one loss applied to signs" in { val env = ExecutionEnvironment.getExecutionEnvironment - val yy = env.fromCollection(Seq[(Double,Double)](-2.3 -> 2.3, -1.0 -> -10.5, 2.0 -> 3.0, 4.0 -> -5.0)) + val yy = env.fromCollection(Seq[(Double,Double)]( + -2.3 -> 2.3, -1.0 -> -10.5, 2.0 -> 3.0, 4.0 -> -5.0)) - val loss = Loss.zeroOneSignumLoss + val loss = Score.zeroOneSignumLoss - val result = loss.loss(yy).collect() + val result = loss.evaluate(yy).collect() result.length shouldBe 1 result.head shouldBe (0.5 +- 1e9) @@ -69,7 +89,7 @@ class LossTest val labels = slr.evaluate(test) - val error = Loss.squaredLoss.loss(labels) + val error = Score.squaredLoss.evaluate(labels) val expectedError = noise*noise error.collect().head shouldBe (expectedError +- expectedError/5) diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala new file mode 100644 index 0000000000000..0269a8a843b6c --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.evaluation + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.ml.regression.RegressionData._ +import org.apache.flink.ml.regression.MultipleLinearRegression +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + + +class ScorerITSuite extends FlatSpec with Matchers with FlinkTestBase { + + behavior of "the Scorer class" + + it should "work for squared loss" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val loss = Score.squaredLoss + + val scorer = new Scorer(loss) + + val mlr = MultipleLinearRegression() + + val parameters = ParameterMap() + + parameters.add(MultipleLinearRegression.Stepsize, 1.0) + parameters.add(MultipleLinearRegression.Iterations, 10) + parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + + val inputDS = env.fromCollection(data) + val evaluationDS = inputDS.map(x => (x.vector, x.label)) + + mlr.fit(inputDS, parameters) + + val mse = scorer.evaluate(evaluationDS, mlr).collect().head + + mse should be < 2.0 + } + +} From 4a7593ade68f43d444a6b289191f053a4ea8b031 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Thu, 25 Jun 2015 11:41:10 +0200 Subject: [PATCH 03/16] Adds accuracy score and R^2 score. Also trying out Scores as classes instead of functions. Not too happy with the extra biolerplate of Score as classes will probably revert, and have objects like RegressionsScores, ClassificationScores that contain the definitions of the relevant scores. --- .../apache/flink/ml/evaluation/Score.scala | 91 ++++++++++++------- .../apache/flink/ml/math/DenseVector.scala | 6 +- .../org/apache/flink/ml/math/Vector.scala | 2 +- .../flink/ml/evaluation/ScoreTest.scala | 59 +++++++++--- .../flink/ml/evaluation/ScorerITSuite.scala | 2 +- 5 files changed, 109 insertions(+), 51 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index 887293ec4a77f..2cb43dd5e0f7d 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -49,50 +49,75 @@ trait PerformanceScore * @param scoringFct function to apply to all elements * @tparam PredictionType output type */ -class MeanScore[PredictionType: TypeInformation: ClassTag]( +abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( scoringFct: (PredictionType, PredictionType) => Double) (implicit yyt: TypeInformation[(PredictionType, PredictionType)]) extends Score[PredictionType] with Serializable { - def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = { trueAndPredicted.map(yy => scoringFct(yy._1, yy._2)).mean() + } } +//TODO: Return to functions in companion object, classes are more cumbersome +/** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ +class SquaredLoss extends MeanScore[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) with Loss -object Score { - /** - * Squared loss function - * - * returns (y1 - y2)' - * - * @return a Loss object - */ - def squaredLoss = new MeanScore[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) with Loss - - /** - * Zero One Loss Function - * - * returns 1 if outputs differ and 0 if they are equal - * - * @tparam T output type - * @return a Loss object - */ - def zeroOneLoss[T: TypeInformation: ClassTag] = { - //TODO: If T == Double, == comparison could be problematic - new MeanScore[T]((y1, y2) => if (y1 == y2) 0 else 1) with PerformanceScore - } +/** + * Zero One Loss Function + * + * returns 1 if outputs differ and 0 if they are equal + * + * @tparam T output type + * @return a Loss object + */ +class ZeroOneLoss[T: TypeInformation: ClassTag] + extends MeanScore[T]((y1, y2) => if (y1 == y2) 0 else 1) with Loss - /** - * Zero One Loss Function also usable for score information - * - * returns 1 if sign of outputs differ and 0 if the signs are equal - * - * @return a Loss object - */ - def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) => +/** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ +class ZeroOneSignumLoss + extends MeanScore[Double]({ (y1, y2) => val sy1 = scala.math.signum(y1) val sy2 = scala.math.signum(y2) if (sy1 == sy2) 0 else 1 - }) with PerformanceScore + }) + with Loss + +/** Calculates the fraction of correct predictions + * + */ +class AccuracyScore + extends MeanScore[Double]((y1, y2) => if (y1 == y2) 1 else 0) with PerformanceScore +/** Calculates the coefficient of determination, $R^2^$ + * + * $R^2^$ indicates how well the data fit the a calculated model + * Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] + */ +class R2Score extends Score[Double] with PerformanceScore { + override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { + val onlyTrue = trueAndPredicted.map(truthPrediction => truthPrediction._1) + val meanTruth = onlyTrue.mean() + + val squaredLoss = new SquaredLoss() + + val ssRes = squaredLoss.evaluate(trueAndPredicted) + val ssTot = squaredLoss.evaluate(onlyTrue.crossWithTiny(meanTruth)) + //TODO: Handle 0 in nominator or denominator + val r2 = ssRes.crossWithTiny(ssTot).map(resTot => 1 - (resTot._1/resTot._2)) + + r2 + } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala index f24249629145c..ee838ab08c886 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala @@ -18,7 +18,11 @@ package org.apache.flink.ml.math -import breeze.linalg.{SparseVector => BreezeSparseVector, DenseVector => BreezeDenseVector, Vector => BreezeVector} +import breeze.linalg.{ + SparseVector => BreezeSparseVector, + DenseVector => BreezeDenseVector, + Vector => BreezeVector, + DenseMatrix => BreezeDenseMatrix} /** * Dense vector implementation of [[Vector]]. The data is represented in a continuous array of diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala index c3a9a3951dfde..0e9fcbd4487ee 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala @@ -75,7 +75,7 @@ trait Vector extends Serializable { } } -object Vector{ +object Vector { /** BreezeVectorConverter implementation for [[Vector]] * * This allows to convert Breeze vectors into [[Vector]]. diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala index 949cca29fa9f3..8dd002e3d5d80 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala @@ -39,7 +39,7 @@ class ScoreTest val yy = env.fromCollection(Seq((0.0, 1.0), (0.0, 0.0), (3.0, 5.0))) - val loss = Score.squaredLoss + val loss = new SquaredLoss() val result = loss.evaluate(yy).collect() @@ -52,7 +52,7 @@ class ScoreTest val yy = env.fromCollection(Seq("a" -> "a", "a" -> "b", "b" -> "c", "d" -> "d")) - val loss = Score.zeroOneLoss[String] + val loss = new ZeroOneLoss[String]() val result = loss.evaluate(yy).collect() @@ -66,7 +66,7 @@ class ScoreTest val yy = env.fromCollection(Seq[(Double,Double)]( -2.3 -> 2.3, -1.0 -> -10.5, 2.0 -> 3.0, 4.0 -> -5.0)) - val loss = Score.zeroOneSignumLoss + val loss = new ZeroOneSignumLoss() val result = loss.evaluate(yy).collect() @@ -76,22 +76,51 @@ class ScoreTest it should "work with a slightly more involved case with linear regression" in { val env = ExecutionEnvironment.getExecutionEnvironment - val center = DenseVector(1.0, -2.0, 3.0, -4.0, 5.0) - val weights = DenseVector(2.0, 1.0, 0.0, -1.0, -2.0) - val n = 1000 - val noise = 0.5 - val ds = env.fromCollection(ToyData.singleGaussianLinearProblem(n, center, weights, noise)) + val center = DenseVector(1.0, -2.0, 3.0, -4.0, 5.0) + val weights = DenseVector(2.0, 1.0, 0.0, -1.0, -2.0) + val n = 1000 + val noise = 0.5 + val ds = env.fromCollection(ToyData.singleGaussianLinearProblem(n, center, weights, noise)) - val slr = new SimpleLeastSquaresRegression - slr.fit(ds) + val slr = new SimpleLeastSquaresRegression + slr.fit(ds) - val test = ds.map(x => (x.vector, x.label)) + val test = ds.map(x => (x.vector, x.label)) - val labels = slr.evaluate(test) + val labels = slr.evaluate(test) - val error = Score.squaredLoss.evaluate(labels) - val expectedError = noise*noise + val error = new SquaredLoss().evaluate(labels) + val expectedError = noise*noise - error.collect().head shouldBe (expectedError +- expectedError/5) + error.collect().head shouldBe (expectedError +- expectedError/5) + } + + it should "work for accuracy score" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val yy = env.fromCollection(Seq(0.0 -> 0.0, 1.0 -> 1.0, 2.0 -> 2.0, 3.0 -> 2.0)) + + val accuracyScore = new AccuracyScore() + + val result = accuracyScore.evaluate(yy).collect() + + result.length shouldBe 1 + result.head shouldBe (0.75 +- 1e9) + } + + it should "calculate the R2 score correctly" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + // List of 50 (i, i + 1.0) tuples, where i the index + val valueList = Range.Double(0.0, 50.0, 1.0).toList zip Range.Double(0.0, 50.0, 1.0).map(_ + 1) + + val yy = env.fromCollection(valueList) + + val r2 = new R2Score() + + val result = r2.evaluate(yy).collect() + + result.length shouldBe 1 + result.head shouldBe (0.995 +- 1e9) } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala index 0269a8a843b6c..540748e334e0e 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala @@ -32,7 +32,7 @@ class ScorerITSuite extends FlatSpec with Matchers with FlinkTestBase { it should "work for squared loss" in { val env = ExecutionEnvironment.getExecutionEnvironment - val loss = Score.squaredLoss + val loss = new SquaredLoss() val scorer = new Scorer(loss) From 5c89c478bd00f168bfe48954d06367b28f948571 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Fri, 26 Jun 2015 13:30:56 +0200 Subject: [PATCH 04/16] Adds a evaluate operation for LabeledVector input --- .../scala/org/apache/flink/ml/package.scala | 4 ++ .../apache/flink/ml/pipeline/Predictor.scala | 40 ++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala index 5d73ab47517f8..c9822fcc4090e 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala @@ -71,6 +71,10 @@ package object ml { .withBroadcastSet(broadcastVariable, "broadcastVariable") } + /** Calculates the mean value of a DataSet[T <\: Numeric[T]] + * + * @return A DataSet[Double] with the mean value as its only element + */ def mean()(implicit num: Numeric[T], ttit: TypeInformation[(T, Int)]): DataSet[Double] = dataSet.map(x => (x, 1)) .reduce((xc, yc) => (num.plus(xc._1, yc._1), xc._2 + yc._2)) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index d0a21f9f81eef..af2f424054232 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -22,7 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.ml._ -import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} +import org.apache.flink.ml.common.{LabeledVector, FlinkMLTools, ParameterMap, WithParameters} +import org.apache.flink.ml.math.{Vector => FlinkVector} /** Predictor trait for Flink's pipeline operators. * @@ -172,6 +173,43 @@ object Predictor { } } } + + /** [[EvaluateDataSetOperation]] which takes a [[PredictOperation]] to calculate a tuple + * of true label value and predicted label value, when the provided with a DataSet of + * [[LabeledVector]]. + * + * Note: We have to put the TypeInformation implicit values for Testing and PredictionValue after + * the PredictOperation implicit parameter. Otherwise, if it's defined as a context bound, then + * the Scala compiler does not find the implicit [[PredictOperation]] value. + * + * @param predictOperation An implicit PredictOperation that takes a Flink Vector and returns + * a Double + * @tparam Instance The [[Predictor]] instance that calls the function + * @tparam Model The model that the calling [[Predictor]] uses for predictions + * @return An EvaluateDataSetOperation for LabeledVector + */ + implicit def LabeledVectorEvaluateDataSetOperation[ + Instance <: Predictor[Instance], + Model]( + implicit predictOperation: PredictOperation[Instance, Model, FlinkVector, Double]) + : EvaluateDataSetOperation[Instance, LabeledVector, Double] = { + new EvaluateDataSetOperation[Instance, LabeledVector, Double] { + override def evaluateDataSet( + instance: Instance, + evaluateParameters: ParameterMap, + testing: DataSet[LabeledVector]) + : DataSet[(Double, Double)] = { + val resultingParameters = instance.parameters ++ evaluateParameters + val model = predictOperation.getModel(instance, resultingParameters) + + testing.mapWithBcVariable(model){ + (element, model) => { + (element.label, predictOperation.predict(element.vector, model)) + } + } + } + } + } } /** Trait for the predict operation of [[Predictor]]. This predict operation works on DataSets. From e7bb4b42424641d640df370cd6ace71f7f42ee8d Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Fri, 26 Jun 2015 13:32:13 +0200 Subject: [PATCH 05/16] Adds Regressor interface, and a score function for regression algorithms. --- .../apache/flink/ml/common/ParameterMap.scala | 2 +- .../apache/flink/ml/evaluation/Score.scala | 21 ++++++++---- .../apache/flink/ml/evaluation/Scorer.scala | 3 +- .../apache/flink/ml/pipeline/Predictor.scala | 2 +- .../regression/MultipleLinearRegression.scala | 5 +-- .../flink/ml/regression/Regressor.scala | 34 +++++++++++++++++++ .../flink/ml/evaluation/ScoreTest.scala | 16 +++++++++ .../MultipleLinearRegressionITSuite.scala | 26 +++++++++++++- 8 files changed, 96 insertions(+), 13 deletions(-) create mode 100644 flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala index 77d2d46b5b912..551f37a97c0e5 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala @@ -112,7 +112,7 @@ object ParameterMap { * * @tparam T Type of parameter value associated to this parameter key */ -trait Parameter[T] { +trait Parameter[+T] { /** * Default value of parameter. If no such value exists, then returns [[None]] diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index 2cb43dd5e0f7d..62fc7e1faf2c5 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -111,12 +111,21 @@ class R2Score extends Score[Double] with PerformanceScore { val onlyTrue = trueAndPredicted.map(truthPrediction => truthPrediction._1) val meanTruth = onlyTrue.mean() - val squaredLoss = new SquaredLoss() - - val ssRes = squaredLoss.evaluate(trueAndPredicted) - val ssTot = squaredLoss.evaluate(onlyTrue.crossWithTiny(meanTruth)) - //TODO: Handle 0 in nominator or denominator - val r2 = ssRes.crossWithTiny(ssTot).map(resTot => 1 - (resTot._1/resTot._2)) + val ssRes = trueAndPredicted + .map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val ssTot = onlyTrue + .crossWithTiny(meanTruth).map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val r2 = ssRes.crossWithTiny(ssTot).map{resTot => + val ssRes = resTot._1 + val ssTot = resTot._2 + // We avoid dividing by 0 and just assign 0.0 + if (ssTot == 0.0) { + 0.0 + } + else { + 1 - (ssRes / ssTot) + } + } r2 } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala index db41fa7e7065e..1190d2f3af569 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala @@ -35,8 +35,7 @@ class Scorer(val metric: Score[Double]) extends WithParameters { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) val resultingParameters = predictorInstance.parameters ++ evaluateParameters - val predictions = predictorInstance. - evaluate[Testing, Double](testing, resultingParameters) + val predictions = predictorInstance.evaluate[Testing, Double](testing, resultingParameters) //TODO: Use parameters metric.evaluate(predictions) } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index af2f424054232..ee25e6d6091e1 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -175,7 +175,7 @@ object Predictor { } /** [[EvaluateDataSetOperation]] which takes a [[PredictOperation]] to calculate a tuple - * of true label value and predicted label value, when the provided with a DataSet of + * of true label value and predicted label value, when provided with a DataSet of * [[LabeledVector]]. * * Note: We have to put the TypeInformation implicit values for Testing and PredictionValue after diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala index c3b318227ea65..1cebb9c8dd463 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala @@ -85,7 +85,7 @@ import org.apache.flink.ml.pipeline.{PredictOperation, FitOperation, Predictor} * Threshold for relative change of sum of squared residuals until convergence. * */ -class MultipleLinearRegression extends Predictor[MultipleLinearRegression] { +class MultipleLinearRegression extends Regressor[MultipleLinearRegression]{ import org.apache.flink.ml._ import MultipleLinearRegression._ @@ -201,7 +201,8 @@ object MultipleLinearRegression { } } } - override def predict(value: T, model: WeightVector): Double = { + override def predict(value: T, model: WeightVector, predictParameters: ParameterMap): + Double = { import Breeze._ val WeightVector(weights, weight0) = model val dotProduct = value.asBreeze.dot(weights.asBreeze) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala new file mode 100644 index 0000000000000..c3763449fe326 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.regression + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.evaluation.R2Score +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, Predictor} + +trait Regressor[Self] extends Predictor[Self]{ + that: Self => + + def score(testing: DataSet[LabeledVector]) + (implicit evaluateOperation: EvaluateDataSetOperation[Self, LabeledVector, Double]): + DataSet[Double] = { + new R2Score().evaluate(this.evaluate[LabeledVector, Double](testing)) + } +} diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala index 8dd002e3d5d80..1cf4a29063de5 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala @@ -123,4 +123,20 @@ class ScoreTest result.length shouldBe 1 result.head shouldBe (0.995 +- 1e9) } + + it should "calculate the R2 score correctly for edge cases" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + // List of 50 (i, i + 1.0) tuples, where i the index + val valueList = Array.ofDim[Double](50) zip Array.ofDim[Double](50).map(_ + 1.0) + + val yy = env.fromCollection(valueList) + + val r2 = new R2Score() + + val result = r2.evaluate(yy).collect() + + result.length shouldBe 1 + result.head shouldBe (0.0 +- 1e9) + } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala index 4e78ba56f8b04..4878de8f9cadb 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -19,7 +19,7 @@ package org.apache.flink.ml.regression import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.ml.common.{WeightVector, ParameterMap} +import org.apache.flink.ml.common.{LabeledVector, WeightVector, ParameterMap} import org.apache.flink.ml.preprocessing.PolynomialFeatures import org.scalatest.{Matchers, FlatSpec} @@ -132,4 +132,28 @@ class MultipleLinearRegressionITSuite absoluteErrorSum should be < 50.0 } + + it should "calculate its score correctly" in { + val env = ExecutionEnvironment.getExecutionEnvironment + val expectedR2 = 0.29310994289260195 + + val mlr = MultipleLinearRegression() + + import RegressionData._ + + val parameters = ParameterMap() + + parameters.add(MultipleLinearRegression.Stepsize, 10.0) + parameters.add(MultipleLinearRegression.Iterations, 100) +// parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.0001) + + val inputDS = env.fromCollection(data) + val evaluationDS = inputDS + + mlr.fit(inputDS, parameters) + + val r2Score = mlr.score(evaluationDS).collect().head + + r2Score should be (expectedR2 +- 0.1) + } } From 3d8a6928b02b30c732f282df61613561dbf8d4fc Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 30 Jun 2015 16:04:58 +0200 Subject: [PATCH 06/16] Added Classifier intermediate class, and default score function for classifiers. --- .../flink/ml/classification/Classifier.scala | 34 +++++++++++++ .../apache/flink/ml/classification/SVM.scala | 2 +- .../flink/ml/classification/SVMITSuite.scala | 51 ++++++++++--------- 3 files changed, 63 insertions(+), 24 deletions(-) create mode 100644 flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala new file mode 100644 index 0000000000000..48d0239899cf7 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification + +import org.apache.flink.api.scala.DataSet +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.evaluation.AccuracyScore +import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, Predictor} + +trait Classifier[Self] extends Predictor[Self]{ + that: Self => + + def score(testing: DataSet[LabeledVector]) + (implicit evaluateOperation: EvaluateDataSetOperation[Self, LabeledVector, Double]): + DataSet[Double] = { + new AccuracyScore().evaluate(this.evaluate[LabeledVector, Double](testing)) + } +} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala index 4c539d9f0784f..d1bb938fc41fe 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala @@ -134,7 +134,7 @@ import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector} * distance to the hyperplane for each example. Setting it to false will return the binary * class label (+1.0, -1.0) (Default value: '''false''') */ -class SVM extends Predictor[SVM] { +class SVM extends Classifier[SVM] { import SVM._ diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala index 57a77839db948..293beee2f06ac 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala @@ -18,6 +18,7 @@ package org.apache.flink.ml.classification +import org.apache.flink.ml.common.LabeledVector import org.scalatest.{FlatSpec, Matchers} import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} @@ -28,22 +29,29 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "The SVM using CoCoA implementation" - it should "train a SVM" in { + def fixture = new { val env = ExecutionEnvironment.getExecutionEnvironment val svm = SVM(). - setBlocks(env.getParallelism). - setIterations(100). - setLocalIterations(100). - setRegularization(0.002). - setStepsize(0.1). - setSeed(0) + setBlocks(env.getParallelism). + setIterations(100). + setLocalIterations(100). + setRegularization(0.002). + setStepsize(0.1). + setSeed(0) val trainingDS = env.fromCollection(Classification.trainingData) + val test = trainingDS.map(x => x.vector) + svm.fit(trainingDS) + } + + it should "train a SVM" in { - val weightVector = svm.weightsOption.get.collect().head + val f = fixture + + val weightVector = f.svm.weightsOption.get.collect().head weightVector.valueIterator.zip(Classification.expectedWeightVector.valueIterator).foreach { case (weight, expectedWeight) => @@ -52,23 +60,12 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase { } it should "make (mostly) correct predictions" in { - val env = ExecutionEnvironment.getExecutionEnvironment - val svm = SVM(). - setBlocks(env.getParallelism). - setIterations(100). - setLocalIterations(100). - setRegularization(0.002). - setStepsize(0.1). - setSeed(0) + val f = fixture - val trainingDS = env.fromCollection(Classification.trainingData) + val test = f.trainingDS - val test = trainingDS.map(x => (x.vector, x.label)) - - svm.fit(trainingDS) - - val predictionPairs = svm.evaluate(test) + val predictionPairs = f.svm.evaluate(test) val absoluteErrorSum = predictionPairs.collect().map{ case (truth, prediction) => Math.abs(truth - prediction)}.sum @@ -97,8 +94,16 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase { val rawPrediction = svm.predict(test).map(vectorLabel => vectorLabel._2).collect().head - rawPrediction should be (15.0 +- 1e-9) + rawPrediction should be (15.0 +- 1e-9) + } + + it should "correctly calculate its score" in { + val f = fixture + + val test = f.trainingDS + val score = f.svm.score(test).collect().head + score should be > 0.9 } } From e1a26ed30bb784633685703892f67d51136f6060 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Wed, 1 Jul 2015 10:20:41 +0200 Subject: [PATCH 07/16] Going back to having scores defined in objects instead of their own classes. --- .../flink/ml/classification/Classifier.scala | 4 +- .../apache/flink/ml/common/ParameterMap.scala | 2 +- .../apache/flink/ml/evaluation/Score.scala | 125 +++++++++--------- .../flink/ml/regression/Regressor.scala | 4 +- .../flink/ml/classification/SVMITSuite.scala | 3 +- .../flink/ml/evaluation/ScoreTest.scala | 14 +- .../flink/ml/evaluation/ScorerITSuite.scala | 2 +- 7 files changed, 79 insertions(+), 75 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala index 48d0239899cf7..a8f92c5e9a790 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala @@ -20,7 +20,7 @@ package org.apache.flink.ml.classification import org.apache.flink.api.scala.DataSet import org.apache.flink.ml.common.LabeledVector -import org.apache.flink.ml.evaluation.AccuracyScore +import org.apache.flink.ml.evaluation.ClassificationScores import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, Predictor} trait Classifier[Self] extends Predictor[Self]{ @@ -29,6 +29,6 @@ trait Classifier[Self] extends Predictor[Self]{ def score(testing: DataSet[LabeledVector]) (implicit evaluateOperation: EvaluateDataSetOperation[Self, LabeledVector, Double]): DataSet[Double] = { - new AccuracyScore().evaluate(this.evaluate[LabeledVector, Double](testing)) + ClassificationScores.accuracyScore.evaluate(this.evaluate[LabeledVector, Double](testing)) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala index 551f37a97c0e5..77d2d46b5b912 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ParameterMap.scala @@ -112,7 +112,7 @@ object ParameterMap { * * @tparam T Type of parameter value associated to this parameter key */ -trait Parameter[+T] { +trait Parameter[T] { /** * Default value of parameter. If no such value exists, then returns [[None]] diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index 62fc7e1faf2c5..61b1f147b1b29 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -58,75 +58,78 @@ abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( } } +object RegressionScores { + /** + * Squared loss function + * + * returns (y1 - y2)' + * + * @return a Loss object + */ + def squaredLoss = new MeanScore[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) with Loss -//TODO: Return to functions in companion object, classes are more cumbersome -/** - * Squared loss function - * - * returns (y1 - y2)' - * - * @return a Loss object - */ -class SquaredLoss extends MeanScore[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) with Loss - -/** - * Zero One Loss Function - * - * returns 1 if outputs differ and 0 if they are equal - * - * @tparam T output type - * @return a Loss object - */ -class ZeroOneLoss[T: TypeInformation: ClassTag] - extends MeanScore[T]((y1, y2) => if (y1 == y2) 0 else 1) with Loss - -/** - * Zero One Loss Function also usable for score information - * - * returns 1 if sign of outputs differ and 0 if the signs are equal - * - * @return a Loss object - */ -class ZeroOneSignumLoss - extends MeanScore[Double]({ (y1, y2) => + /** + * Zero One Loss Function also usable for score information + * + * returns 1 if sign of outputs differ and 0 if the signs are equal + * + * @return a Loss object + */ + def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) => val sy1 = scala.math.signum(y1) val sy2 = scala.math.signum(y2) if (sy1 == sy2) 0 else 1 - }) - with Loss - -/** Calculates the fraction of correct predictions - * - */ -class AccuracyScore - extends MeanScore[Double]((y1, y2) => if (y1 == y2) 1 else 0) with PerformanceScore + }) with Loss -/** Calculates the coefficient of determination, $R^2^$ - * - * $R^2^$ indicates how well the data fit the a calculated model - * Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] - */ -class R2Score extends Score[Double] with PerformanceScore { - override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { - val onlyTrue = trueAndPredicted.map(truthPrediction => truthPrediction._1) - val meanTruth = onlyTrue.mean() + /** Calculates the coefficient of determination, $R^2^$ + * + * $R^2^$ indicates how well the data fit the a calculated model + * Reference: [[http://en.wikipedia.org/wiki/Coefficient_of_determination]] + */ + def r2Score = new Score[Double] with PerformanceScore { + override def evaluate(trueAndPredicted: DataSet[(Double, Double)]): DataSet[Double] = { + val onlyTrue = trueAndPredicted.map(truthPrediction => truthPrediction._1) + val meanTruth = onlyTrue.mean() - val ssRes = trueAndPredicted - .map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) - val ssTot = onlyTrue - .crossWithTiny(meanTruth).map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) - val r2 = ssRes.crossWithTiny(ssTot).map{resTot => - val ssRes = resTot._1 - val ssTot = resTot._2 - // We avoid dividing by 0 and just assign 0.0 - if (ssTot == 0.0) { - 0.0 - } - else { - 1 - (ssRes / ssTot) + val ssRes = trueAndPredicted + .map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val ssTot = onlyTrue + .crossWithTiny(meanTruth).map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) + val r2 = ssRes.crossWithTiny(ssTot).map{resTot => + val ssRes = resTot._1 + val ssTot = resTot._2 + // We avoid dividing by 0 and just assign 0.0 + if (ssTot == 0.0) { + 0.0 + } + else { + 1 - (ssRes / ssTot) + } } + r2 } + } +} + +object ClassificationScores { + /** Calculates the fraction of correct predictions + * + */ + def accuracyScore = new MeanScore[Double]((y1, y2) => if (y1 == y2) 1 else 0) + with PerformanceScore - r2 + /** + * Zero One Loss Function + * + * returns 1 if outputs differ and 0 if they are equal + * + * @tparam T output type + * @return a Loss object + */ + def zeroOneLoss[T: TypeInformation: ClassTag] = { + //TODO: If T == Double, == comparison could be problematic + new MeanScore[T]((y1, y2) => if (y1 == y2) 0 else 1) with Loss } } + + diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala index c3763449fe326..f99ba22f72882 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala @@ -20,7 +20,7 @@ package org.apache.flink.ml.regression import org.apache.flink.api.scala.DataSet import org.apache.flink.ml.common.LabeledVector -import org.apache.flink.ml.evaluation.R2Score +import org.apache.flink.ml.evaluation.RegressionScores import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, Predictor} trait Regressor[Self] extends Predictor[Self]{ @@ -29,6 +29,6 @@ trait Regressor[Self] extends Predictor[Self]{ def score(testing: DataSet[LabeledVector]) (implicit evaluateOperation: EvaluateDataSetOperation[Self, LabeledVector, Double]): DataSet[Double] = { - new R2Score().evaluate(this.evaluate[LabeledVector, Double](testing)) + RegressionScores.r2Score.evaluate(this.evaluate[LabeledVector, Double](testing)) } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala index 293beee2f06ac..5cde2d9b9d78b 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala @@ -40,6 +40,7 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase { setStepsize(0.1). setSeed(0) + val trainingDS = env.fromCollection(Classification.trainingData) val test = trainingDS.map(x => x.vector) @@ -81,7 +82,7 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase { .setOutputDecisionFunction(false) val customWeights = env.fromElements(DenseVector(1.0, 1.0, 1.0)) - + svm.weightsOption = Option(customWeights) val test = env.fromElements(DenseVector(5.0, 5.0, 5.0)) diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala index 1cf4a29063de5..ccda72944f773 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala @@ -39,7 +39,7 @@ class ScoreTest val yy = env.fromCollection(Seq((0.0, 1.0), (0.0, 0.0), (3.0, 5.0))) - val loss = new SquaredLoss() + val loss = RegressionScores.squaredLoss val result = loss.evaluate(yy).collect() @@ -52,7 +52,7 @@ class ScoreTest val yy = env.fromCollection(Seq("a" -> "a", "a" -> "b", "b" -> "c", "d" -> "d")) - val loss = new ZeroOneLoss[String]() + val loss = ClassificationScores.zeroOneLoss[String] val result = loss.evaluate(yy).collect() @@ -66,7 +66,7 @@ class ScoreTest val yy = env.fromCollection(Seq[(Double,Double)]( -2.3 -> 2.3, -1.0 -> -10.5, 2.0 -> 3.0, 4.0 -> -5.0)) - val loss = new ZeroOneSignumLoss() + val loss = RegressionScores.zeroOneSignumLoss val result = loss.evaluate(yy).collect() @@ -89,7 +89,7 @@ class ScoreTest val labels = slr.evaluate(test) - val error = new SquaredLoss().evaluate(labels) + val error = RegressionScores.squaredLoss.evaluate(labels) val expectedError = noise*noise error.collect().head shouldBe (expectedError +- expectedError/5) @@ -100,7 +100,7 @@ class ScoreTest val yy = env.fromCollection(Seq(0.0 -> 0.0, 1.0 -> 1.0, 2.0 -> 2.0, 3.0 -> 2.0)) - val accuracyScore = new AccuracyScore() + val accuracyScore = ClassificationScores.accuracyScore val result = accuracyScore.evaluate(yy).collect() @@ -116,7 +116,7 @@ class ScoreTest val yy = env.fromCollection(valueList) - val r2 = new R2Score() + val r2 = RegressionScores.r2Score val result = r2.evaluate(yy).collect() @@ -132,7 +132,7 @@ class ScoreTest val yy = env.fromCollection(valueList) - val r2 = new R2Score() + val r2 = RegressionScores.r2Score val result = r2.evaluate(yy).collect() diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala index 540748e334e0e..68b71e7055344 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala @@ -32,7 +32,7 @@ class ScorerITSuite extends FlatSpec with Matchers with FlinkTestBase { it should "work for squared loss" in { val env = ExecutionEnvironment.getExecutionEnvironment - val loss = new SquaredLoss() + val loss = RegressionScores.squaredLoss val scorer = new Scorer(loss) From 0dd251a5a59cd610c4df3e9a1ea3921b1a9cc2e0 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Wed, 1 Jul 2015 15:00:37 +0200 Subject: [PATCH 08/16] Removed ParameterMap from predict function of PredictOperation --- .../main/scala/org/apache/flink/ml/pipeline/Predictor.scala | 4 ---- .../apache/flink/ml/regression/MultipleLinearRegression.scala | 3 +-- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index ee25e6d6091e1..e8e19ee29c281 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -178,10 +178,6 @@ object Predictor { * of true label value and predicted label value, when provided with a DataSet of * [[LabeledVector]]. * - * Note: We have to put the TypeInformation implicit values for Testing and PredictionValue after - * the PredictOperation implicit parameter. Otherwise, if it's defined as a context bound, then - * the Scala compiler does not find the implicit [[PredictOperation]] value. - * * @param predictOperation An implicit PredictOperation that takes a Flink Vector and returns * a Double * @tparam Instance The [[Predictor]] instance that calls the function diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala index 1cebb9c8dd463..c0503ef1f8c32 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala @@ -201,8 +201,7 @@ object MultipleLinearRegression { } } } - override def predict(value: T, model: WeightVector, predictParameters: ParameterMap): - Double = { + override def predict(value: T, model: WeightVector): Double = { import Breeze._ val WeightVector(weights, weight0) = model val dotProduct = value.asBreeze.dot(weights.asBreeze) From 492e9a383af6285f0fdca5031d2bd7bdfe3cd511 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Thu, 2 Jul 2015 12:21:28 +0200 Subject: [PATCH 09/16] Reworked score functionality allow chained Predictors. All predictors must now implement a calculateScore function. We are for now assuming that predictors are supervised learning algorithms, once unsupervised learning algorithms are added this will need to be reworked. Also added an evaluate dataset operation to ALS, to allow for scoring of the algorithm. Default performance measure for ALS is RMSE. --- .../flink/ml/classification/Classifier.scala | 23 ++++-- .../apache/flink/ml/evaluation/Score.scala | 8 ++- .../flink/ml/pipeline/ChainedPredictor.scala | 8 ++- .../apache/flink/ml/pipeline/Predictor.scala | 29 +++++++- .../apache/flink/ml/recommendation/ALS.scala | 70 +++++++++++++++---- .../flink/ml/regression/Regressor.scala | 23 ++++-- .../flink/ml/classification/SVMITSuite.scala | 1 - .../flink/ml/evaluation/ScoreTest.scala | 24 ------- .../flink/ml/evaluation/ScorerITSuite.scala | 47 +++++++++++-- .../flink/ml/recommendation/ALSITSuite.scala | 38 ++++++---- 10 files changed, 199 insertions(+), 72 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala index a8f92c5e9a790..4fdeb2a54c2b0 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala @@ -18,17 +18,26 @@ package org.apache.flink.ml.classification -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.api.scala._ import org.apache.flink.ml.evaluation.ClassificationScores -import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, Predictor} +import org.apache.flink.ml.pipeline.Predictor +/** Trait that classification algorithms should implement + * + * @tparam Self Type of the implementing class + */ trait Classifier[Self] extends Predictor[Self]{ that: Self => - def score(testing: DataSet[LabeledVector]) - (implicit evaluateOperation: EvaluateDataSetOperation[Self, LabeledVector, Double]): - DataSet[Double] = { - ClassificationScores.accuracyScore.evaluate(this.evaluate[LabeledVector, Double](testing)) + override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) + : DataSet[Double] = { + val tpi = input.getType() + if (tpi == createTypeInformation[(Double, Double)]) { + val doubleInput = input.asInstanceOf[DataSet[(Double, Double)]] + ClassificationScores.accuracyScore.evaluate(doubleInput) + } + else { + throw new UnsupportedOperationException("ALS should have Double predictions") + } } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index 61b1f147b1b29..6dcc360e259ae 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -115,8 +115,8 @@ object ClassificationScores { /** Calculates the fraction of correct predictions * */ - def accuracyScore = new MeanScore[Double]((y1, y2) => if (y1 == y2) 1 else 0) - with PerformanceScore + def accuracyScore = + new MeanScore[Double]((y1, y2) => if (y1 == y2) 1 else 0) with PerformanceScore /** * Zero One Loss Function @@ -127,7 +127,9 @@ object ClassificationScores { * @return a Loss object */ def zeroOneLoss[T: TypeInformation: ClassTag] = { - //TODO: If T == Double, == comparison could be problematic + // TODO: If T == Double, == comparison could be problematic + // Also, if we plan to use LabeledVector for all classification tasks, the type parameter can be + // removed new MeanScore[T]((y1, y2) => if (y1 == y2) 0 else 1) with Loss } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala index bf5a8b2893880..e4f589e86f38b 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala @@ -37,7 +37,13 @@ import org.apache.flink.ml.common.ParameterMap * @tparam P Type of the trailing [[Predictor]] */ case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P) - extends Predictor[ChainedPredictor[T, P]]{} + extends Predictor[ChainedPredictor[T, P]]{ + + override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) + : DataSet[Double] = { + predictor.calculateScore(input) + } +} object ChainedPredictor{ diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index e8e19ee29c281..f5227ed5f19ab 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.ml._ import org.apache.flink.ml.common.{LabeledVector, FlinkMLTools, ParameterMap, WithParameters} +import org.apache.flink.ml.evaluation.ClassificationScores import org.apache.flink.ml.math.{Vector => FlinkVector} /** Predictor trait for Flink's pipeline operators. @@ -60,7 +61,7 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { predictor.predictDataSet(this, predictParameters, testing) } - /** Evaluates the testing data by computing the prediction value and returning a pair of true + /** Computes a prediction value for each example in the testing data and returns a pair of true * label value and prediction value. It is important that the implementation chooses a Testing * type from which it can extract the true label value. * @@ -79,6 +80,32 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } + + /** Calculates a numerical score for the [[Predictor]] + * + * By convention, higher scores are considered better, so even if a loss is used as a performance + * measure, it will be negated, so that that higher is better. + * @param testing The evaluation DataSet, that contains the features and the true value + * @param evaluateOperation An EvaluateDataSetOperation that produces Double results + * @tparam Testing The type of the features and true value, for example [[LabeledVector]] + * @return A DataSet containing one Double that indicates the score of the predictor + */ + def score[Testing](testing: DataSet[Testing]) + (implicit evaluateOperation: EvaluateDataSetOperation[Self, Testing, Double]): + DataSet[Double] = { + // TODO: Generalized so that we don't necessarily expect Double prediction values + calculateScore(this.evaluate[Testing, Double](testing)) + } + + /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) + * tuples + * + * @param input A DataSet of (truth, prediction) tuples + * @tparam Prediction The type of the supervised label, for example a numerical class label. + * @return A DataSet containing one Double that indicates the score of the predictor + */ + private[ml] def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]): + DataSet[Double] } object Predictor { diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index d8af42f34f132..b2d35ef1903d2 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -25,11 +25,15 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.common.operators.Order import org.apache.flink.core.memory.{DataOutputView, DataInputView} import org.apache.flink.ml.common._ -import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.ml.evaluation.RegressionScores +import org.apache.flink.ml.math.{DenseVector, BLAS} +import org.apache.flink.ml.pipeline._ import org.apache.flink.types.Value import org.apache.flink.util.Collector -import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction} +import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, + GroupReduceFunction, CoGroupFunction} +// TODO: Use only one BLAS interface import com.github.fommil.netlib.BLAS.{ getInstance => blas } import com.github.fommil.netlib.LAPACK.{ getInstance => lapack } import org.netlib.util.intW @@ -147,7 +151,7 @@ class ALS extends Predictor[ALS] { } /** Sets the number of iterations of the ALS algorithm - * + * * @param iterations * @return */ @@ -157,7 +161,7 @@ class ALS extends Predictor[ALS] { } /** Sets the number of blocks into which the user and item matrix shall be partitioned - * + * * @param blocks * @return */ @@ -167,7 +171,7 @@ class ALS extends Predictor[ALS] { } /** Sets the random seed for the initial item matrix initialization - * + * * @param seed * @return */ @@ -178,7 +182,7 @@ class ALS extends Predictor[ALS] { /** Sets the temporary path into which intermediate results are written in order to increase * performance. - * + * * @param temporaryPath * @return */ @@ -253,6 +257,25 @@ class ALS extends Predictor[ALS] { "Prior to predicting values, it has to be trained on data.") } } + + /** Calculates the RMSE for the algorithm, given a DataSet of (truth, prediction) tuples + * + * @param input A DataSet of (truth, prediction) tuples + * @tparam Prediction The type of the supervised label, for example a numerical rating. + * @return A DataSet containing one Double that indicates the RMSE score of the predictor + */ + override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) + : DataSet[Double] = { + // TODO: Move function to a Recommender trait? + val tpi = input.getType() + if (tpi == createTypeInformation[(Double, Double)]) { + val doubleInput = input.asInstanceOf[DataSet[(Double, Double)]] + RegressionScores.squaredLoss.evaluate(doubleInput).map(Math.sqrt(_)) + } + else { + throw new UnsupportedOperationException("ALS should have Double predictions") + } + } } object ALS { @@ -407,12 +430,7 @@ object ALS { val uFactorsVector = uFactors.factors val iFactorsVector = iFactors.factors - val prediction = blas.ddot( - uFactorsVector.length, - uFactorsVector, - 1, - iFactorsVector, - 1) + val prediction = BLAS.dot(DenseVector(uFactorsVector), DenseVector(iFactorsVector)) (uID, iID, prediction) } @@ -425,6 +443,34 @@ object ALS { } } + implicit val evaluateRatings = new EvaluateDataSetOperation[ALS, (Int, Int, Double), Double] { + override def evaluateDataSet( + instance: ALS, + evaluateParameters: ParameterMap, + testing: DataSet[(Int, Int, Double)]): DataSet[(Double, Double)] = { + instance.factorsOption match { + case Some((userFactors, itemFactors)) => { + testing.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0).equalTo(0) + .join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2").equalTo(0).map { + tuple => { + val (((uID, iID, truth), uFactors), iFactors) = tuple + + val uFactorsVector = uFactors.factors + val iFactorsVector = iFactors.factors + + val prediction = BLAS.dot(DenseVector(uFactorsVector), DenseVector(iFactorsVector)) + + (truth, prediction) + } + } + } + + case None => throw new RuntimeException("The ALS model has not been fitted to data. " + + "Prior to predicting values, it has to be trained on data.") + } + } + } + /** Calculates the matrix factorization for the given ratings. A rating is defined as * a tuple of user ID, item ID and the corresponding rating. * diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala index f99ba22f72882..5ea2d6f3d4439 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala @@ -18,17 +18,26 @@ package org.apache.flink.ml.regression -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.api.scala._ import org.apache.flink.ml.evaluation.RegressionScores -import org.apache.flink.ml.pipeline.{EvaluateDataSetOperation, Predictor} +import org.apache.flink.ml.pipeline.Predictor +/** Trait that regression algorithms should implement + * + * @tparam Self Type of the implementing class + */ trait Regressor[Self] extends Predictor[Self]{ that: Self => - def score(testing: DataSet[LabeledVector]) - (implicit evaluateOperation: EvaluateDataSetOperation[Self, LabeledVector, Double]): - DataSet[Double] = { - RegressionScores.r2Score.evaluate(this.evaluate[LabeledVector, Double](testing)) + override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) + : DataSet[Double] = { + val tpi = input.getType() + if (tpi == createTypeInformation[(Double, Double)]) { + val doubleInput = input.asInstanceOf[DataSet[(Double, Double)]] + RegressionScores.r2Score.evaluate(doubleInput) + } + else { + throw new UnsupportedOperationException("ALS should have Double predictions") + } } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala index 5cde2d9b9d78b..91bcdb065c44c 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala @@ -18,7 +18,6 @@ package org.apache.flink.ml.classification -import org.apache.flink.ml.common.LabeledVector import org.scalatest.{FlatSpec, Matchers} import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala index ccda72944f773..1c33f9d7cef8b 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala @@ -20,9 +20,6 @@ package org.apache.flink.ml.evaluation import org.apache.flink.api.scala._ -import org.apache.flink.ml.data.ToyData -import org.apache.flink.ml.math.DenseVector -import org.apache.flink.ml.regression.SimpleLeastSquaresRegression import org.apache.flink.test.util.FlinkTestBase import org.scalatest.{FlatSpec, Matchers} @@ -74,27 +71,6 @@ class ScoreTest result.head shouldBe (0.5 +- 1e9) } - it should "work with a slightly more involved case with linear regression" in { - val env = ExecutionEnvironment.getExecutionEnvironment - val center = DenseVector(1.0, -2.0, 3.0, -4.0, 5.0) - val weights = DenseVector(2.0, 1.0, 0.0, -1.0, -2.0) - val n = 1000 - val noise = 0.5 - val ds = env.fromCollection(ToyData.singleGaussianLinearProblem(n, center, weights, noise)) - - val slr = new SimpleLeastSquaresRegression - slr.fit(ds) - - val test = ds.map(x => (x.vector, x.label)) - - val labels = slr.evaluate(test) - - val error = RegressionScores.squaredLoss.evaluate(labels) - val expectedError = noise*noise - - error.collect().head shouldBe (expectedError +- expectedError/5) - } - it should "work for accuracy score" in { val env = ExecutionEnvironment.getExecutionEnvironment diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala index 68b71e7055344..deb2c9467cd14 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala @@ -19,6 +19,7 @@ package org.apache.flink.ml.evaluation import org.apache.flink.api.scala._ import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.ml.preprocessing.StandardScaler import org.apache.flink.ml.regression.RegressionData._ import org.apache.flink.ml.regression.MultipleLinearRegression import org.apache.flink.test.util.FlinkTestBase @@ -29,7 +30,7 @@ class ScorerITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "the Scorer class" - it should "work for squared loss" in { + def fixture = new { val env = ExecutionEnvironment.getExecutionEnvironment val loss = RegressionScores.squaredLoss @@ -45,11 +46,49 @@ class ScorerITSuite extends FlatSpec with Matchers with FlinkTestBase { parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) val inputDS = env.fromCollection(data) - val evaluationDS = inputDS.map(x => (x.vector, x.label)) + } + + it should "work for squared loss" in { + + val f = fixture + + f.mlr.fit(f.inputDS, f.parameters) + + val evaluationDS = f.inputDS.map(x => (x.vector, x.label)) + + val mse = f.scorer.evaluate(evaluationDS, f.mlr).collect().head + + mse should be < 2.0 + } + + it should "be possible to obtain scores for a chained predictor" in { + val f = fixture + + val scaler = StandardScaler() + + val chainedMLR = scaler.chainPredictor(f.mlr) + + chainedMLR.fit(f.inputDS, f.parameters) + + val evaluationDS = f.inputDS.map(x => (x.vector, x.label)) + + val mse = f.scorer.evaluate(evaluationDS, chainedMLR).collect().head + + mse should be < 2.0 + } + + it should "be possible to call score on a chained predictor" in { + val f = fixture + + val scaler = StandardScaler() + + val chainedMLR = scaler.chainPredictor(f.mlr) + + chainedMLR.fit(f.inputDS, f.parameters) - mlr.fit(inputDS, parameters) + val evaluationDS = f.inputDS - val mse = scorer.evaluate(evaluationDS, mlr).collect().head + val mse = chainedMLR.score(evaluationDS).collect().head mse should be < 2.0 } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala index 2ad310d2500d2..b2f8621bc6267 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala @@ -25,6 +25,7 @@ import org.scalatest._ import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.test.util.FlinkTestBase +import Recommendation._ class ALSITSuite extends FlatSpec @@ -35,8 +36,8 @@ class ALSITSuite behavior of "The alternating least squares (ALS) implementation" - it should "properly factorize a matrix" in { - import Recommendation._ + def fixture = new { + val env = ExecutionEnvironment.getExecutionEnvironment @@ -50,28 +51,41 @@ class ALSITSuite als.fit(inputDS) - val testData = env.fromCollection(expectedResult.map{ - case (userID, itemID, rating) => (userID, itemID) - }) + val evaluationData = env.fromCollection(expectedResult) + + val testData = evaluationData.map(idsAndRating => (idsAndRating._1 , idsAndRating._2)) + } + + it should "properly factorize a matrix" in { - val predictions = als.predict(testData).collect() + val f = fixture + + val predictions = f.als.predict(f.testData).collect() predictions.length should equal(expectedResult.length) val resultMap = expectedResult map { - case (uID, iID, value) => (uID, iID) -> value + case (uID, iID, rating) => (uID, iID) -> rating } toMap predictions foreach { - case (uID, iID, value) => { - resultMap.isDefinedAt(((uID, iID))) should be(true) + case (uID, iID, rating) => { + resultMap.isDefinedAt((uID, iID)) should be (true) - value should be(resultMap((uID, iID)) +- 0.1) + rating should be(resultMap((uID, iID)) +- 0.1) } } - val risk = als.empiricalRisk(inputDS).collect().apply(0) + val risk = f.als.empiricalRisk(f.inputDS).collect().head + + risk should be (expectedEmpiricalRisk +- 1) + } + + it should "provide a score for the factorization" in { + val f = fixture + + val rmse = f.als.score(f.evaluationData).collect().head - risk should be(expectedEmpiricalRisk +- 1) + rmse should be < 0.01 } } From d9715ed3a6faba78e0b34368425768e826d5a736 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Mon, 6 Jul 2015 10:50:59 +0200 Subject: [PATCH 10/16] Made calculateScore only take DataSet[(Double, Double)] --- .../flink/ml/classification/Classifier.scala | 18 ++++++++---------- .../flink/ml/pipeline/ChainedPredictor.scala | 9 +++++++-- .../apache/flink/ml/pipeline/Predictor.scala | 4 +--- .../apache/flink/ml/recommendation/ALS.scala | 13 ++----------- .../apache/flink/ml/regression/Regressor.scala | 18 ++++++++---------- 5 files changed, 26 insertions(+), 36 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala index 4fdeb2a54c2b0..c92680ef09b6c 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala @@ -29,15 +29,13 @@ import org.apache.flink.ml.pipeline.Predictor trait Classifier[Self] extends Predictor[Self]{ that: Self => - override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) - : DataSet[Double] = { - val tpi = input.getType() - if (tpi == createTypeInformation[(Double, Double)]) { - val doubleInput = input.asInstanceOf[DataSet[(Double, Double)]] - ClassificationScores.accuracyScore.evaluate(doubleInput) - } - else { - throw new UnsupportedOperationException("ALS should have Double predictions") - } + /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) + * tuples + * + * @param input A DataSet of (truth, prediction) tuples + * @return A DataSet containing one Double that indicates the score of the predictor + */ + override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { + ClassificationScores.accuracyScore.evaluate(input) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala index e4f589e86f38b..b2fe64f82f47f 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala @@ -39,8 +39,13 @@ import org.apache.flink.ml.common.ParameterMap case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P) extends Predictor[ChainedPredictor[T, P]]{ - override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) - : DataSet[Double] = { + /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) + * tuples + * + * @param input A DataSet of (truth, prediction) tuples + * @return A DataSet containing one Double that indicates the score of the predictor + */ + override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { predictor.calculateScore(input) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index f5227ed5f19ab..dde90690cdf68 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -101,11 +101,9 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { * tuples * * @param input A DataSet of (truth, prediction) tuples - * @tparam Prediction The type of the supervised label, for example a numerical class label. * @return A DataSet containing one Double that indicates the score of the predictor */ - private[ml] def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]): - DataSet[Double] + private[ml] def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] } object Predictor { diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index b2d35ef1903d2..e1cdbdf4871f1 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -261,20 +261,11 @@ class ALS extends Predictor[ALS] { /** Calculates the RMSE for the algorithm, given a DataSet of (truth, prediction) tuples * * @param input A DataSet of (truth, prediction) tuples - * @tparam Prediction The type of the supervised label, for example a numerical rating. * @return A DataSet containing one Double that indicates the RMSE score of the predictor */ - override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) - : DataSet[Double] = { + override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { // TODO: Move function to a Recommender trait? - val tpi = input.getType() - if (tpi == createTypeInformation[(Double, Double)]) { - val doubleInput = input.asInstanceOf[DataSet[(Double, Double)]] - RegressionScores.squaredLoss.evaluate(doubleInput).map(Math.sqrt(_)) - } - else { - throw new UnsupportedOperationException("ALS should have Double predictions") - } + RegressionScores.squaredLoss.evaluate(input).map(Math.sqrt(_)) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala index 5ea2d6f3d4439..031097b876899 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala @@ -29,15 +29,13 @@ import org.apache.flink.ml.pipeline.Predictor trait Regressor[Self] extends Predictor[Self]{ that: Self => - override def calculateScore[Prediction](input: DataSet[(Prediction, Prediction)]) - : DataSet[Double] = { - val tpi = input.getType() - if (tpi == createTypeInformation[(Double, Double)]) { - val doubleInput = input.asInstanceOf[DataSet[(Double, Double)]] - RegressionScores.r2Score.evaluate(doubleInput) - } - else { - throw new UnsupportedOperationException("ALS should have Double predictions") - } + /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) + * tuples + * + * @param input A DataSet of (truth, prediction) tuples + * @return A DataSet containing one Double that indicates the score of the predictor + */ + override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { + RegressionScores.r2Score.evaluate(input) } } From 7f1a6da52dfcd47d39c39cee2141112e5c10ddad Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 7 Jul 2015 10:15:58 +0200 Subject: [PATCH 11/16] Added test for DataSet.mean() --- .../test/scala/org/apache/flink/ml/MLUtilsSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala index 1464d07cc2878..8d4cabcf7dd81 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala @@ -109,4 +109,14 @@ class MLUtilsSuite extends FlatSpec with Matchers with FlinkTestBase { tempFile.delete() } + + it should "correctly find the mean of a DataSet" in { + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromCollection(List(5.0, -5.0, 0.0)) + + val mean = ds.mean().collect().head + + mean should be (0.0 +- 1e-9) + } } From edbe3dd9ea48d168f67a9ff231f8373a6aaee38d Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Tue, 7 Jul 2015 14:11:45 +0200 Subject: [PATCH 12/16] Switched from cross to mapWithBcVariable --- .../apache/flink/ml/evaluation/Score.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index 6dcc360e259ae..7138996cd9a05 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -94,17 +94,20 @@ object RegressionScores { val ssRes = trueAndPredicted .map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) val ssTot = onlyTrue - .crossWithTiny(meanTruth).map(tp => (tp._1 - tp._2) * (tp._1 - tp._2)).reduce(_ + _) - val r2 = ssRes.crossWithTiny(ssTot).map{resTot => - val ssRes = resTot._1 - val ssTot = resTot._2 - // We avoid dividing by 0 and just assign 0.0 - if (ssTot == 0.0) { - 0.0 - } - else { - 1 - (ssRes / ssTot) - } + .mapWithBcVariable(meanTruth) { + case (truth: Double, meanTruth: Double) => (truth - meanTruth) * (truth - meanTruth) + }.reduce(_ + _) + + val r2 = ssRes + .mapWithBcVariable(ssTot) { + case (ssRes: Double, ssTot: Double) => + // We avoid dividing by 0 and just assign 0.0 + if (ssTot == 0.0) { + 0.0 + } + else { + 1 - (ssRes / ssTot) + } } r2 } From e840c14032f5fea3b476e1a99122eb9125ba5a4f Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Wed, 8 Jul 2015 18:48:27 +0200 Subject: [PATCH 13/16] Addressed multiple PR comments. --- .../scala/org/apache/flink/ml/evaluation/Score.scala | 5 +++-- .../org/apache/flink/ml/evaluation/Scorer.scala | 8 +++----- .../src/main/scala/org/apache/flink/ml/package.scala | 4 +++- .../org/apache/flink/ml/pipeline/Predictor.scala | 12 +++++------- .../org/apache/flink/ml/recommendation/ALS.scala | 6 +++--- .../{ScoreTest.scala => ScoreITSuite.scala} | 11 ++++------- .../apache/flink/ml/evaluation/ScorerITSuite.scala | 1 - .../regression/MultipleLinearRegressionITSuite.scala | 5 +++-- 8 files changed, 24 insertions(+), 28 deletions(-) rename flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/{ScoreTest.scala => ScoreITSuite.scala} (92%) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index 7138996cd9a05..99ae535ddccca 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -53,6 +53,7 @@ abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( scoringFct: (PredictionType, PredictionType) => Double) (implicit yyt: TypeInformation[(PredictionType, PredictionType)]) extends Score[PredictionType] with Serializable { + def evaluate(trueAndPredicted: DataSet[(PredictionType, PredictionType)]): DataSet[Double] = { trueAndPredicted.map(yy => scoringFct(yy._1, yy._2)).mean() } @@ -76,8 +77,8 @@ object RegressionScores { * @return a Loss object */ def zeroOneSignumLoss = new MeanScore[Double]({ (y1, y2) => - val sy1 = scala.math.signum(y1) - val sy2 = scala.math.signum(y2) + val sy1 = y1.signum + val sy2 = y2.signum if (sy1 == sy2) 0 else 1 }) with Loss diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala index 1190d2f3af569..8de1c7ecee43a 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Scorer.scala @@ -24,20 +24,18 @@ import org.apache.flink.ml.pipeline.Predictor import org.apache.flink.ml.pipeline.EvaluateDataSetOperation //TODO: Need to generalize type of Score (and evaluateOperation) -class Scorer(val metric: Score[Double]) extends WithParameters { +class Scorer(val score: Score[Double]) extends WithParameters { - def evaluate[Testing, PredictorInstance<: Predictor[PredictorInstance]]( + def evaluate[Testing, PredictorInstance <: Predictor[PredictorInstance]]( testing: DataSet[Testing], predictorInstance: PredictorInstance, evaluateParameters: ParameterMap = ParameterMap.Empty) (implicit evaluateOperation: EvaluateDataSetOperation[PredictorInstance, Testing, Double]): DataSet[Double] = { - FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) val resultingParameters = predictorInstance.parameters ++ evaluateParameters val predictions = predictorInstance.evaluate[Testing, Double](testing, resultingParameters) - //TODO: Use parameters - metric.evaluate(predictions) + score.evaluate(predictions) } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala index c9822fcc4090e..e1cbdb2d43c22 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala @@ -70,8 +70,10 @@ package object ml { dataSet.map(new BroadcastSingleElementMapperWithIteration[T, B, O](dataSet.clean(fun))) .withBroadcastSet(broadcastVariable, "broadcastVariable") } + } - /** Calculates the mean value of a DataSet[T <\: Numeric[T]] + implicit class RichNumericDataSet[T : Numeric](dataSet: DataSet[T]) { + /** Calculates the mean value of a DataSet[T <: Numeric[T] ] * * @return A DataSet[Double] with the mean value as its only element */ diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index dde90690cdf68..205cfd8614e28 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -209,17 +209,15 @@ object Predictor { * @tparam Model The model that the calling [[Predictor]] uses for predictions * @return An EvaluateDataSetOperation for LabeledVector */ - implicit def LabeledVectorEvaluateDataSetOperation[ - Instance <: Predictor[Instance], - Model]( - implicit predictOperation: PredictOperation[Instance, Model, FlinkVector, Double]) + implicit def LabeledVectorEvaluateDataSetOperation[Instance <: Predictor[Instance],Model] + (implicit predictOperation: PredictOperation[Instance, Model, FlinkVector, Double]) : EvaluateDataSetOperation[Instance, LabeledVector, Double] = { new EvaluateDataSetOperation[Instance, LabeledVector, Double] { override def evaluateDataSet( instance: Instance, evaluateParameters: ParameterMap, testing: DataSet[LabeledVector]) - : DataSet[(Double, Double)] = { + : DataSet[(Double, Double)] = { val resultingParameters = instance.parameters ++ evaluateParameters val model = predictOperation.getModel(instance, resultingParameters) @@ -233,7 +231,7 @@ object Predictor { } } -/** Trait for the predict operation of [[Predictor]]. This predict operation works on DataSets. +/** Type class for the predict operation of [[Predictor]]. This predict operation works on DataSets. * * [[Predictor]]s either have to implement this trait or the [[PredictOperation]] trait. The * implementation has to be made available as an implicit value or function in the scope of @@ -295,7 +293,7 @@ trait PredictOperation[Instance, Model, Testing, Prediction] extends Serializabl def predict(value: Testing, model: Model): Prediction } -/** Trait for the evaluate operation of [[Predictor]]. This evaluate operation works on +/** Type class for the evaluate operation of [[Predictor]]. This evaluate operation works on * DataSets. * * It takes a [[DataSet]] of some type. For each element of this [[DataSet]] the evaluate method diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index e1cdbdf4871f1..e53ec356791ed 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -33,7 +33,6 @@ import org.apache.flink.util.Collector import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction} -// TODO: Use only one BLAS interface import com.github.fommil.netlib.BLAS.{ getInstance => blas } import com.github.fommil.netlib.LAPACK.{ getInstance => lapack } import org.netlib.util.intW @@ -438,13 +437,14 @@ object ALS { override def evaluateDataSet( instance: ALS, evaluateParameters: ParameterMap, - testing: DataSet[(Int, Int, Double)]): DataSet[(Double, Double)] = { + testing: DataSet[(Int, Int, Double)]) + : DataSet[(Double, Double)] = { instance.factorsOption match { case Some((userFactors, itemFactors)) => { testing.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0).equalTo(0) .join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2").equalTo(0).map { tuple => { - val (((uID, iID, truth), uFactors), iFactors) = tuple + val (((_, _, truth), uFactors), iFactors) = tuple val uFactorsVector = uFactors.factors val iFactorsVector = iFactors.factors diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreITSuite.scala similarity index 92% rename from flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala rename to flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreITSuite.scala index 1c33f9d7cef8b..58eabdf440132 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreTest.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreITSuite.scala @@ -24,10 +24,7 @@ import org.apache.flink.test.util.FlinkTestBase import org.scalatest.{FlatSpec, Matchers} -class ScoreTest - extends FlatSpec - with Matchers - with FlinkTestBase { +class ScoreITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "Evaluation Score functions" @@ -88,7 +85,7 @@ class ScoreTest val env = ExecutionEnvironment.getExecutionEnvironment // List of 50 (i, i + 1.0) tuples, where i the index - val valueList = Range.Double(0.0, 50.0, 1.0).toList zip Range.Double(0.0, 50.0, 1.0).map(_ + 1) + val valueList = Range.Double(0.0, 50.0, 1.0) zip Range.Double(0.0, 50.0, 1.0).map(_ + 1) val yy = env.fromCollection(valueList) @@ -97,13 +94,13 @@ class ScoreTest val result = r2.evaluate(yy).collect() result.length shouldBe 1 - result.head shouldBe (0.995 +- 1e9) + result.head shouldBe (0.99519807923169268 +- 1e9) } it should "calculate the R2 score correctly for edge cases" in { val env = ExecutionEnvironment.getExecutionEnvironment - // List of 50 (i, i + 1.0) tuples, where i the index + // List of 50 (0.0, 1.0) tuples val valueList = Array.ofDim[Double](50) zip Array.ofDim[Double](50).map(_ + 1.0) val yy = env.fromCollection(valueList) diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala index deb2c9467cd14..260645a79533b 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala @@ -25,7 +25,6 @@ import org.apache.flink.ml.regression.MultipleLinearRegression import org.apache.flink.test.util.FlinkTestBase import org.scalatest.{FlatSpec, Matchers} - class ScorerITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "the Scorer class" diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala index 4878de8f9cadb..3fb0a70dffd53 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -18,6 +18,7 @@ package org.apache.flink.ml.regression +import org.apache.flink.ml._ import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.ml.common.{LabeledVector, WeightVector, ParameterMap} import org.apache.flink.ml.preprocessing.PolynomialFeatures @@ -145,7 +146,7 @@ class MultipleLinearRegressionITSuite parameters.add(MultipleLinearRegression.Stepsize, 10.0) parameters.add(MultipleLinearRegression.Iterations, 100) -// parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.0001) + parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.0001) val inputDS = env.fromCollection(data) val evaluationDS = inputDS @@ -154,6 +155,6 @@ class MultipleLinearRegressionITSuite val r2Score = mlr.score(evaluationDS).collect().head - r2Score should be (expectedR2 +- 0.1) + r2Score should be (expectedR2 +- 0.01) } } From 6e48b612f0a367e798e40590f6921d4dc242f2aa Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Wed, 8 Jul 2015 19:06:42 +0200 Subject: [PATCH 14/16] Add approximatelyEquals for Doubles, used for score calculation. --- .../scala/org/apache/flink/ml/evaluation/Score.scala | 11 ++++++----- .../src/main/scala/org/apache/flink/ml/package.scala | 10 ++++++++++ .../org/apache/flink/ml/evaluation/ScoreITSuite.scala | 4 ++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index 99ae535ddccca..d960cb0b32aa4 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -119,22 +119,23 @@ object ClassificationScores { /** Calculates the fraction of correct predictions * */ - def accuracyScore = - new MeanScore[Double]((y1, y2) => if (y1 == y2) 1 else 0) with PerformanceScore + def accuracyScore = { + new MeanScore[Double]((y1, y2) => if (y1.approximatelyEquals(y2)) 1 else 0) + with PerformanceScore + } /** * Zero One Loss Function * * returns 1 if outputs differ and 0 if they are equal * - * @tparam T output type * @return a Loss object */ - def zeroOneLoss[T: TypeInformation: ClassTag] = { + def zeroOneLoss = { // TODO: If T == Double, == comparison could be problematic // Also, if we plan to use LabeledVector for all classification tasks, the type parameter can be // removed - new MeanScore[T]((y1, y2) => if (y1 == y2) 0 else 1) with Loss + new MeanScore[Double]((y1, y2) => if (y1.approximatelyEquals(y2)) 0 else 1) with Loss } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala index e1cbdb2d43c22..f02fbb0100ab6 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala @@ -29,6 +29,16 @@ import scala.reflect.ClassTag package object ml { + implicit class RichDouble(double: Double) { + def approximatelyEquals(other: Double, precision: Double = 1e-9): Boolean = { + if (scala.math.abs(double - other) < precision) { + true + } else { + false + } + } + } + /** Pimp my [[ExecutionEnvironment]] to directly support `readLibSVM` * * @param executionEnvironment diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreITSuite.scala index 58eabdf440132..3aaa916d89eae 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScoreITSuite.scala @@ -44,9 +44,9 @@ class ScoreITSuite extends FlatSpec with Matchers with FlinkTestBase { it should "work for zero one loss" in { val env = ExecutionEnvironment.getExecutionEnvironment - val yy = env.fromCollection(Seq("a" -> "a", "a" -> "b", "b" -> "c", "d" -> "d")) + val yy = env.fromCollection(Seq(1.0 -> 1.0, 2.0 -> 2.0, 3.0 -> 4.0, 4.0 -> 5.0)) - val loss = ClassificationScores.zeroOneLoss[String] + val loss = ClassificationScores.zeroOneLoss val result = loss.evaluate(yy).collect() From 57d0ef2c4bc268d1d870c7aab537dd611f464fcf Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Wed, 8 Jul 2015 19:23:14 +0200 Subject: [PATCH 15/16] Improved dostrings for Score --- .../apache/flink/ml/evaluation/Score.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala index d960cb0b32aa4..442f808cd1553 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/evaluation/Score.scala @@ -27,8 +27,8 @@ import scala.reflect.ClassTag /** * Evaluation score * - * Takes a whole data set and then computes the evaluation score on them (obviously, again encoded - * in a DataSet) + * Can be used to calculate a performance score for an algorithm, when provided with a DataSet + * of (truth, prediction) tuples * * @tparam PredictionType output type */ @@ -59,20 +59,23 @@ abstract class MeanScore[PredictionType: TypeInformation: ClassTag]( } } +/** Scores aimed at evaluating the performance of regression algorithms + * + */ object RegressionScores { /** - * Squared loss function + * Mean Squared loss function * - * returns (y1 - y2)' + * Calculates (y1 - y2)^2^ and returns the mean. * * @return a Loss object */ def squaredLoss = new MeanScore[Double]((y1,y2) => (y1 - y2) * (y1 - y2)) with Loss /** - * Zero One Loss Function also usable for score information + * Mean Zero One Loss Function also usable for score information * - * returns 1 if sign of outputs differ and 0 if the signs are equal + * Assigns 1 if sign of outputs differ and 0 if the signs are equal, and returns the mean * * @return a Loss object */ @@ -115,6 +118,9 @@ object RegressionScores { } } +/** Scores aimed at evaluating the performance of classification algorithms + * + */ object ClassificationScores { /** Calculates the fraction of correct predictions * @@ -125,16 +131,13 @@ object ClassificationScores { } /** - * Zero One Loss Function + * Mean Zero One Loss Function * - * returns 1 if outputs differ and 0 if they are equal + * Assigns 1 if outputs differ and 0 if they are equal, and returns the mean. * * @return a Loss object */ def zeroOneLoss = { - // TODO: If T == Double, == comparison could be problematic - // Also, if we plan to use LabeledVector for all classification tasks, the type parameter can be - // removed new MeanScore[Double]((y1, y2) => if (y1.approximatelyEquals(y2)) 0 else 1) with Loss } } From eb66de590947ca8f887a8e52f8c66ec860b82af3 Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Wed, 8 Jul 2015 19:27:32 +0200 Subject: [PATCH 16/16] Removed score function from Predictor. --- .../flink/ml/classification/Classifier.scala | 9 ------- .../flink/ml/pipeline/ChainedPredictor.scala | 13 +--------- .../apache/flink/ml/pipeline/Predictor.scala | 24 ------------------- .../apache/flink/ml/recommendation/ALS.scala | 10 -------- .../flink/ml/regression/Regressor.scala | 9 ------- .../flink/ml/classification/SVMITSuite.scala | 10 -------- .../flink/ml/evaluation/ScorerITSuite.scala | 17 ------------- .../flink/ml/recommendation/ALSITSuite.scala | 8 ------- .../MultipleLinearRegressionITSuite.scala | 24 ------------------- 9 files changed, 1 insertion(+), 123 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala index c92680ef09b6c..92daff5a05391 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/Classifier.scala @@ -29,13 +29,4 @@ import org.apache.flink.ml.pipeline.Predictor trait Classifier[Self] extends Predictor[Self]{ that: Self => - /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) - * tuples - * - * @param input A DataSet of (truth, prediction) tuples - * @return A DataSet containing one Double that indicates the score of the predictor - */ - override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { - ClassificationScores.accuracyScore.evaluate(input) - } } diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala index b2fe64f82f47f..6ab053349b460 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala @@ -37,18 +37,7 @@ import org.apache.flink.ml.common.ParameterMap * @tparam P Type of the trailing [[Predictor]] */ case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P) - extends Predictor[ChainedPredictor[T, P]]{ - - /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) - * tuples - * - * @param input A DataSet of (truth, prediction) tuples - * @return A DataSet containing one Double that indicates the score of the predictor - */ - override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { - predictor.calculateScore(input) - } -} + extends Predictor[ChainedPredictor[T, P]] {} object ChainedPredictor{ diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala index 205cfd8614e28..e84f3bcd0d951 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala @@ -80,30 +80,6 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters { FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) evaluator.evaluateDataSet(this, evaluateParameters, testing) } - - /** Calculates a numerical score for the [[Predictor]] - * - * By convention, higher scores are considered better, so even if a loss is used as a performance - * measure, it will be negated, so that that higher is better. - * @param testing The evaluation DataSet, that contains the features and the true value - * @param evaluateOperation An EvaluateDataSetOperation that produces Double results - * @tparam Testing The type of the features and true value, for example [[LabeledVector]] - * @return A DataSet containing one Double that indicates the score of the predictor - */ - def score[Testing](testing: DataSet[Testing]) - (implicit evaluateOperation: EvaluateDataSetOperation[Self, Testing, Double]): - DataSet[Double] = { - // TODO: Generalized so that we don't necessarily expect Double prediction values - calculateScore(this.evaluate[Testing, Double](testing)) - } - - /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) - * tuples - * - * @param input A DataSet of (truth, prediction) tuples - * @return A DataSet containing one Double that indicates the score of the predictor - */ - private[ml] def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] } object Predictor { diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala index e53ec356791ed..947d26aa4ac9f 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala @@ -256,16 +256,6 @@ class ALS extends Predictor[ALS] { "Prior to predicting values, it has to be trained on data.") } } - - /** Calculates the RMSE for the algorithm, given a DataSet of (truth, prediction) tuples - * - * @param input A DataSet of (truth, prediction) tuples - * @return A DataSet containing one Double that indicates the RMSE score of the predictor - */ - override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { - // TODO: Move function to a Recommender trait? - RegressionScores.squaredLoss.evaluate(input).map(Math.sqrt(_)) - } } object ALS { diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala index 031097b876899..bc260886b0517 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/Regressor.scala @@ -29,13 +29,4 @@ import org.apache.flink.ml.pipeline.Predictor trait Regressor[Self] extends Predictor[Self]{ that: Self => - /** Calculates the performance score for the algorithm, given a DataSet of (truth, prediction) - * tuples - * - * @param input A DataSet of (truth, prediction) tuples - * @return A DataSet containing one Double that indicates the score of the predictor - */ - override def calculateScore(input: DataSet[(Double, Double)]): DataSet[Double] = { - RegressionScores.r2Score.evaluate(input) - } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala index 91bcdb065c44c..5c94f8cacadd4 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala @@ -96,14 +96,4 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase { rawPrediction should be (15.0 +- 1e-9) } - - it should "correctly calculate its score" in { - val f = fixture - - val test = f.trainingDS - - val score = f.svm.score(test).collect().head - - score should be > 0.9 - } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala index 260645a79533b..e53cc43b46cf8 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/evaluation/ScorerITSuite.scala @@ -75,21 +75,4 @@ class ScorerITSuite extends FlatSpec with Matchers with FlinkTestBase { mse should be < 2.0 } - - it should "be possible to call score on a chained predictor" in { - val f = fixture - - val scaler = StandardScaler() - - val chainedMLR = scaler.chainPredictor(f.mlr) - - chainedMLR.fit(f.inputDS, f.parameters) - - val evaluationDS = f.inputDS - - val mse = chainedMLR.score(evaluationDS).collect().head - - mse should be < 2.0 - } - } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala index b2f8621bc6267..3209ac0fd4c0e 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala @@ -80,12 +80,4 @@ class ALSITSuite risk should be (expectedEmpiricalRisk +- 1) } - - it should "provide a score for the factorization" in { - val f = fixture - - val rmse = f.als.score(f.evaluationData).collect().head - - rmse should be < 0.01 - } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala index 3fb0a70dffd53..f26ff6916ef7c 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -133,28 +133,4 @@ class MultipleLinearRegressionITSuite absoluteErrorSum should be < 50.0 } - - it should "calculate its score correctly" in { - val env = ExecutionEnvironment.getExecutionEnvironment - val expectedR2 = 0.29310994289260195 - - val mlr = MultipleLinearRegression() - - import RegressionData._ - - val parameters = ParameterMap() - - parameters.add(MultipleLinearRegression.Stepsize, 10.0) - parameters.add(MultipleLinearRegression.Iterations, 100) - parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.0001) - - val inputDS = env.fromCollection(data) - val evaluationDS = inputDS - - mlr.fit(inputDS, parameters) - - val r2Score = mlr.score(evaluationDS).collect().head - - r2Score should be (expectedR2 +- 0.01) - } }