From 63a605a62276e018e3d2ca50820ae5114621f827 Mon Sep 17 00:00:00 2001 From: fobeligi Date: Fri, 5 Jun 2015 23:12:43 +0200 Subject: [PATCH 1/3] [FLINK-1844] Add MinMaxScaler implementation in the proprocessing package, test for the for the corresponding functionality and documentation. --- docs/libs/ml/minMax_scaler.md | 113 ++++++++ .../flink/ml/preprocessing/MinMaxScaler.scala | 256 ++++++++++++++++++ .../preprocessing/MinMaxScalerITSuite.scala | 128 +++++++++ 3 files changed, 497 insertions(+) create mode 100644 docs/libs/ml/minMax_scaler.md create mode 100644 flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala create mode 100644 flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala diff --git a/docs/libs/ml/minMax_scaler.md b/docs/libs/ml/minMax_scaler.md new file mode 100644 index 0000000000000..2554d1923e2e9 --- /dev/null +++ b/docs/libs/ml/minMax_scaler.md @@ -0,0 +1,113 @@ +--- +mathjax: include +htmlTitle: FlinkML - MinMax Scaler +title: FlinkML - MinMax Scaler +--- + + +* This will be replaced by the TOC +{:toc} + +## Description + + The MinMax scaler scales the given data set, so that all values will lie between a user specified range [min,max]. + In case the user does not provide a specific minimum and maximum value for the scaling range, the MinMax scaler transforms the features of the input data set to lie in the [0,1] interval. + Given a set of input data $x_1, x_2,... x_n$, with minimum value: + + $$x_{min} = min({x_1, x_2,..., x_n})$$ + + and maximum value: + + $$x_{max} = max({x_1, x_2,..., x_n})$$ + +The scaled data set $z_1, z_2,...,z_n$ will be: + + $$z_{i}= \frac{x_{i} - x_{min}}{x_{max} - x_{min}} \left ( max - min \right ) + min$$ + +where $\textit{min}$ and $\textit{max}$ are the user specified minimum and maximum values of the range to scale. + +## Operations + +`MinMaxScaler` is a `Transformer`. +As such, it supports the `fit` and `transform` operation. + +### Fit + +MinMaxScaler is trained on all subtypes of `Vector` or `LabeledVector`: + +* `fit[T <: Vector]: DataSet[T] => Unit` +* `fit: DataSet[LabeledVector] => Unit` + +### Transform + +MinMaxScaler transforms all subtypes of `Vector` or `LabeledVector` into the respective type: + +* `transform[T <: Vector]: DataSet[T] => DataSet[T]` +* `transform: DataSet[LabeledVector] => DataSet[LabeledVector]` + +## Parameters + +The MinMax scaler implementation can be controlled by the following two parameters: + + + + + + + + + + + + + + + + + + + +
ParametersDescription
Min +

+ The minimum value of the range for the scaled data set. (Default value: 0.0) +

+
Std +

+ The maximum value of the range for the scaled data set. (Default value: 1.0) +

+
+ +## Examples + +{% highlight scala %} +// Create MinMax scaler transformer +val minMaxscaler = MinMaxScaler() +.setMin(-1.0) +.setMax(1.0) + +// Obtain data set to be scaled +val dataSet: DataSet[Vector] = ... + +// Learn the minimum and maximum values of the training data +minMaxscaler.fit(dataSet) + +// Scale the provided data set to have min=-1.0 and max=1.0 +val scaledDS = minMaxscaler.transform(dataSet) +{% endhighlight %} diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala new file mode 100644 index 0000000000000..6d9cae1be4103 --- /dev/null +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import breeze.linalg +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.ml._ +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{FitOperation, TransformOperation, Transformer} +import org.apache.flink.ml.preprocessing.MinMaxScaler.{Max, Min} + +import scala.reflect.ClassTag + +/** Scales observations, so that all features are in a user-specified range. + * By default for [[MinMaxScaler]] transformer range = (0,1). + * + * This transformer takes a subtype of [[Vector]] of values and maps it to a + * scaled subtype of [[Vector]] such that each feature lies between a user-specified range. + * + * This transformer can be prepended to all [[Transformer]] and + * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype + * of [[Vector]]. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(data) + * val transformer = MinMaxScaler().setMin(-1.0).setMax(1.0) + * + * transformer.fit(trainingDS) + * val transformedDS = transformer.transform(trainingDS) + * }}} + * + * =Parameters= + * + * - [[Min]]: The minimum value of the range of the transformed data set; by default equal to 0 + * - [[Max]]: The maximum value of the range of the transformed data set; by default + * equal to 1 + */ +class MinMaxScaler extends Transformer[MinMaxScaler] { + + var metricsOption: Option[DataSet[(linalg.Vector[Double], linalg.Vector[Double])]] = None + + /** Sets the minimum for the range of the transformed data + * + * @param min the user-specified minimum value. + * @return the MinMaxScaler instance with its minimum value set to the user-specified value. + */ + def setMin(min: Double): MinMaxScaler = { + parameters.add(Min, min) + this + } + + /** Sets the maximum for the range of the transformed data + * + * @param max the user-specified maximum value. + * @return the MinMaxScaler instance with its minimum value set to the user-specified value. + */ + def setMax(max: Double): MinMaxScaler = { + parameters.add(Max, max) + this + } +} + +object MinMaxScaler { + + // ====================================== Parameters ============================================= + + case object Min extends Parameter[Double] { + override val defaultValue: Option[Double] = Some(0.0) + } + + case object Max extends Parameter[Double] { + override val defaultValue: Option[Double] = Some(1.0) + } + + // ==================================== Factory methods ========================================== + + def apply(): MinMaxScaler = { + new MinMaxScaler() + } + + // ====================================== Operations ============================================= + + /** Trains the [[org.apache.flink.ml.preprocessing.MinMaxScaler]] by learning the minimum and + * maximum of each feature of the training data. These values are used in the transform step + * to transform the given input data. + * + * @tparam T Input data type which is a subtype of [[Vector]] + * @return + */ + implicit def fitVectorMinMaxScaler[T <: Vector] = new FitOperation[MinMaxScaler, T] { + override def fit(instance: MinMaxScaler, fitParameters: ParameterMap, input: DataSet[T]) + : Unit = { + val metrics = extractFeatureMinMaxVectors(input) + + instance.metricsOption = Some(metrics) + } + } + + /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of the features of the + * training data which is of type [[LabeledVector]]. The minimum and maximum are used to + * transform the given input data. + * + */ + implicit val fitLabeledVectorMinMaxScaler = { + new FitOperation[MinMaxScaler, LabeledVector] { + override def fit( + instance: MinMaxScaler, + fitParameters: ParameterMap, + input: DataSet[LabeledVector]) + : Unit = { + val vectorDS = input.map(_.vector) + val metrics = extractFeatureMinMaxVectors(vectorDS) + + instance.metricsOption = Some(metrics) + } + } + } + + /** Calculates in one pass over the data the features' minimum and maximum values. + * + * + * @param dataSet The data set for which we want to calculate the minimum and maximum values. + * @return DataSet containing a single tuple of two vectors (minVector, maxVector). + * The first vector represents the minimum values vector and the second is the maximum + * values vector. + */ + private def extractFeatureMinMaxVectors[T <: Vector](dataSet: DataSet[T]) + : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { + + val minMax = dataSet.map { + v => (v.asBreeze, v.asBreeze) + }.reduce { + (minMax1, minMax2) => { + + + val tempMinimum = linalg.Vector.zeros[Double](minMax1._1.length) + + for (i <- 0 until minMax1._1.length) { + tempMinimum(i) = if (minMax1._1(i) < minMax2._1(i)) { + minMax1._1(i) + } else { + minMax2._1(i) + } + } + + val tempMaximum = linalg.Vector.zeros[Double](minMax1._2.length) + + for (i <- 0 until minMax1._2.length) { + tempMaximum(i) = if (minMax1._2(i) > minMax2._2(i)) { + minMax1._2(i) + } else { + minMax2._2(i) + } + } + (tempMinimum, tempMaximum) + } + } + minMax + } + + /** [[TransformOperation]] which scales input data of subtype of [[Vector]] with respect to + * the calculated minimum and maximum of the training data. The minimum and maximum + * values of the resulting data is configurable. + * + * @tparam T Type of the input and output data which has to be a subtype of [[Vector]] + * @return + */ + implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] + = { + new TransformOperation[MinMaxScaler, T, T] { + override def transform( + instance: MinMaxScaler, + transformParameters: ParameterMap, + input: DataSet[T]) + : DataSet[T] = { + + val resultingParameters = instance.parameters ++ transformParameters + val min = resultingParameters(Min) + val max = resultingParameters(Max) + + instance.metricsOption match { + case Some(metrics) => { + input.mapWithBcVariable(metrics) { + (vector, metrics) => { + val (broadcastMin, broadcastMax) = metrics + var myVector = vector.asBreeze + + myVector -= broadcastMin + myVector :/= (broadcastMax - broadcastMin) + myVector = (myVector :* (max - min)) + min + myVector.fromBreeze + } + } + } + + case None => + throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " + + "This is necessary to estimate the minimum and maximum of the data.") + } + } + } + } + + implicit val transformLabeledVectors = { + new TransformOperation[MinMaxScaler, LabeledVector, LabeledVector] { + override def transform(instance: MinMaxScaler, + transformParameters: ParameterMap, + input: DataSet[LabeledVector]): DataSet[LabeledVector] = { + val resultingParameters = instance.parameters ++ transformParameters + val min = resultingParameters(Min) + val max = resultingParameters(Max) + + instance.metricsOption match { + case Some(metrics) => { + input.mapWithBcVariable(metrics) { + (labeledVector, metrics) => { + val (broadcastMax, broadcastMin) = metrics + val LabeledVector(label, vector) = labeledVector + var breezeVector = vector.asBreeze + + breezeVector -= broadcastMin + breezeVector :/= (broadcastMax - broadcastMin) + breezeVector = (breezeVector :* (max - min)) + min + LabeledVector(label, breezeVector.fromBreeze) + } + } + } + + case None => + throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " + + "This is necessary to estimate the minimum and maximum of the data.") + } + } + } + } + +} \ No newline at end of file diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala new file mode 100644 index 0000000000000..a415f815a0e45 --- /dev/null +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.ml.preprocessing + +import org.apache.flink.api.scala._ +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{DenseVector, Vector} +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + + +class MinMaxScalerITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's MinMax Scaler" + + import MinMaxScalerData._ + + it should "scale the vectors' values to be restricted in the (0.0,1.0) range" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data) + val minMaxScaler = MinMaxScaler() + minMaxScaler.fit(dataSet) + val scaledVectors = minMaxScaler.transform(dataSet).collect + + scaledVectors.length should equal(data.length) + + for (vector <- scaledVectors) { + val test = vector.asBreeze.forall(fv => { + fv >= 0.0 && fv <= 1.0 + }) + test shouldEqual (true) + } + + } + + it should "scale vectors' values in the (-1.0,1.0) range" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data) + val minMaxScaler = MinMaxScaler().setMin(-1.0).setMax(1.0) + minMaxScaler.fit(dataSet) + val scaledVectors = minMaxScaler.transform(dataSet).collect + + scaledVectors.length should equal(data.length) + + for (vector <- scaledVectors) { + val test = vector.asBreeze.forall(fv => { + fv >= -1.0 && fv <= 1.0 + }) + test shouldEqual (true) + } + + } +} + + +object MinMaxScalerData { + + val data: Seq[Vector] = List(DenseVector(Array(2104.00, 3.00)), + DenseVector(Array(1600.00, 3.00)), + DenseVector(Array(2400.00, 3.00)), + DenseVector(Array(1416.00, 2.00)), + DenseVector(Array(3000.00, 4.00)), + DenseVector(Array(1985.00, 4.00)), + DenseVector(Array(1534.00, 3.00)), + DenseVector(Array(1427.00, 3.00)), + DenseVector(Array(1380.00, 3.00)), + DenseVector(Array(1494.00, 3.00)), + DenseVector(Array(1940.00, 4.00)), + DenseVector(Array(2000.00, 3.00)), + DenseVector(Array(1890.00, 3.00)), + DenseVector(Array(4478.00, 5.00)), + DenseVector(Array(1268.00, 3.00)), + DenseVector(Array(2300.00, 4.00)), + DenseVector(Array(1320.00, 2.00)), + DenseVector(Array(1236.00, 3.00)), + DenseVector(Array(2609.00, 4.00)), + DenseVector(Array(3031.00, 4.00)), + DenseVector(Array(1767.00, 3.00)), + DenseVector(Array(1888.00, 2.00)), + DenseVector(Array(1604.00, 3.00)), + DenseVector(Array(1962.00, 4.00)), + DenseVector(Array(3890.00, 3.00)), + DenseVector(Array(1100.00, 3.00)), + DenseVector(Array(1458.00, 3.00)), + DenseVector(Array(2526.00, 3.00)), + DenseVector(Array(2200.00, 3.00)), + DenseVector(Array(2637.00, 3.00)), + DenseVector(Array(1839.00, 2.00)), + DenseVector(Array(1000.00, 1.00)), + DenseVector(Array(2040.00, 4.00)), + DenseVector(Array(3137.00, 3.00)), + DenseVector(Array(1811.00, 4.00)), + DenseVector(Array(1437.00, 3.00)), + DenseVector(Array(1239.00, 3.00)), + DenseVector(Array(2132.00, 4.00)), + DenseVector(Array(4215.00, 4.00)), + DenseVector(Array(2162.00, 4.00)), + DenseVector(Array(1664.00, 2.00)), + DenseVector(Array(2238.00, 3.00)), + DenseVector(Array(2567.00, 4.00)), + DenseVector(Array(1200.00, 3.00)), + DenseVector(Array(852.00, 2.00)), + DenseVector(Array(1852.00, 4.00)), + DenseVector(Array(1203.00, 3.00)) + ) +} \ No newline at end of file From 693618a9d91e0db8faeae3ad0f6d1b16544c38a0 Mon Sep 17 00:00:00 2001 From: fobeligi Date: Sat, 6 Jun 2015 00:51:14 +0200 Subject: [PATCH 2/3] [FLINK-1844] Change second test to use LabeledVectors --- .../flink/ml/preprocessing/MinMaxScaler.scala | 7 +- .../preprocessing/MinMaxScalerITSuite.scala | 64 +++++++++++++++++-- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala index 6d9cae1be4103..1ebc62c5fa8bc 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala @@ -151,7 +151,6 @@ object MinMaxScaler { }.reduce { (minMax1, minMax2) => { - val tempMinimum = linalg.Vector.zeros[Double](minMax1._1.length) for (i <- 0 until minMax1._1.length) { @@ -233,7 +232,7 @@ object MinMaxScaler { case Some(metrics) => { input.mapWithBcVariable(metrics) { (labeledVector, metrics) => { - val (broadcastMax, broadcastMin) = metrics + val (broadcastMin, broadcastMax) = metrics val LabeledVector(label, vector) = labeledVector var breezeVector = vector.asBreeze @@ -241,6 +240,7 @@ object MinMaxScaler { breezeVector :/= (broadcastMax - broadcastMin) breezeVector = (breezeVector :* (max - min)) + min LabeledVector(label, breezeVector.fromBreeze) + } } } @@ -252,5 +252,4 @@ object MinMaxScaler { } } } - -} \ No newline at end of file +} diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala index a415f815a0e45..8ba78c7fc6002 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala @@ -17,7 +17,9 @@ */ package org.apache.flink.ml.preprocessing +import breeze.linalg import org.apache.flink.api.scala._ +import org.apache.flink.ml.common.LabeledVector import org.apache.flink.ml.math.Breeze._ import org.apache.flink.ml.math.{DenseVector, Vector} import org.apache.flink.test.util.FlinkTestBase @@ -57,16 +59,16 @@ class MinMaxScalerITSuite val env = ExecutionEnvironment.getExecutionEnvironment - val dataSet = env.fromCollection(data) + val dataSet = env.fromCollection(data2) val minMaxScaler = MinMaxScaler().setMin(-1.0).setMax(1.0) minMaxScaler.fit(dataSet) val scaledVectors = minMaxScaler.transform(dataSet).collect - scaledVectors.length should equal(data.length) + scaledVectors.length should equal(data2.length) - for (vector <- scaledVectors) { - val test = vector.asBreeze.forall(fv => { - fv >= -1.0 && fv <= 1.0 + for (labeledVector <- scaledVectors) { + val test = labeledVector.vector.asBreeze.forall(lv => { + lv >= -1.0 && lv <= 1.0 }) test shouldEqual (true) } @@ -125,4 +127,54 @@ object MinMaxScalerData { DenseVector(Array(1852.00, 4.00)), DenseVector(Array(1203.00, 3.00)) ) -} \ No newline at end of file + + val data2: Seq[LabeledVector] = List ( + LabeledVector(1.0, DenseVector(Array(2104.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1600.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2400.00, 3.00))), + LabeledVector(0.0, DenseVector(Array(1416.00, 2.00))), + LabeledVector(1.0, DenseVector(Array(3000.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(1985.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(1534.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1427.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1380.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1494.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1940.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(2000.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1890.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(4478.00, 5.00))), + LabeledVector(1.0, DenseVector(Array(1268.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2300.00, 4.00))), + LabeledVector(0.0, DenseVector(Array(1320.00, 2.00))), + LabeledVector(1.0, DenseVector(Array(1236.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2609.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(3031.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(1767.00, 3.00))), + LabeledVector(0.0, DenseVector(Array(1888.00, 2.00))), + LabeledVector(1.0, DenseVector(Array(1604.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1962.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(3890.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1100.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1458.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2526.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2200.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2637.00, 3.00))), + LabeledVector(0.0, DenseVector(Array(1839.00, 2.00))), + LabeledVector(0.0, DenseVector(Array(1000.00, 1.00))), + LabeledVector(1.0, DenseVector(Array(2040.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(3137.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1811.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(1437.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(1239.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2132.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(4215.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(2162.00, 4.00))), + LabeledVector(0.0, DenseVector(Array(1664.00, 2.00))), + LabeledVector(1.0, DenseVector(Array(2238.00, 3.00))), + LabeledVector(1.0, DenseVector(Array(2567.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(1200.00, 3.00))), + LabeledVector(0.0, DenseVector(Array(852.00, 2.00))), + LabeledVector(1.0, DenseVector(Array(1852.00, 4.00))), + LabeledVector(1.0, DenseVector(Array(1203.00, 3.00))) + ) +} From 169d8ddb9a2cf47b49d6562a034b5f39d5655c9b Mon Sep 17 00:00:00 2001 From: fobeligi Date: Mon, 8 Jun 2015 13:01:37 +0200 Subject: [PATCH 3/3] [FLINK-1844] Incorporate PR comments --- docs/libs/ml/minMax_scaler.md | 3 +- .../flink/ml/preprocessing/MinMaxScaler.scala | 47 ++- .../preprocessing/MinMaxScalerITSuite.scala | 269 +++++++++++------- 3 files changed, 190 insertions(+), 129 deletions(-) diff --git a/docs/libs/ml/minMax_scaler.md b/docs/libs/ml/minMax_scaler.md index 2554d1923e2e9..99a5d90931984 100644 --- a/docs/libs/ml/minMax_scaler.md +++ b/docs/libs/ml/minMax_scaler.md @@ -84,7 +84,7 @@ The MinMax scaler implementation can be controlled by the following two paramete - Std + Max

The maximum value of the range for the scaled data set. (Default value: 1.0) @@ -100,7 +100,6 @@ The MinMax scaler implementation can be controlled by the following two paramete // Create MinMax scaler transformer val minMaxscaler = MinMaxScaler() .setMin(-1.0) -.setMax(1.0) // Obtain data set to be scaled val dataSet: DataSet[Vector] = ... diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala index 1ebc62c5fa8bc..7ebc5b66bc8e2 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala @@ -18,6 +18,7 @@ package org.apache.flink.ml.preprocessing import breeze.linalg +import breeze.linalg.{max, min} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.ml._ @@ -30,7 +31,7 @@ import org.apache.flink.ml.preprocessing.MinMaxScaler.{Max, Min} import scala.reflect.ClassTag /** Scales observations, so that all features are in a user-specified range. - * By default for [[MinMaxScaler]] transformer range = (0,1). + * By default for [[MinMaxScaler]] transformer range = [0,1]. * * This transformer takes a subtype of [[Vector]] of values and maps it to a * scaled subtype of [[Vector]] such that each feature lies between a user-specified range. @@ -42,7 +43,7 @@ import scala.reflect.ClassTag * @example * {{{ * val trainingDS: DataSet[Vector] = env.fromCollection(data) - * val transformer = MinMaxScaler().setMin(-1.0).setMax(1.0) + * val transformer = MinMaxScaler().setMin(-1.0) * * transformer.fit(trainingDS) * val transformedDS = transformer.transform(trainingDS) @@ -136,7 +137,6 @@ object MinMaxScaler { } /** Calculates in one pass over the data the features' minimum and maximum values. - * * * @param dataSet The data set for which we want to calculate the minimum and maximum values. * @return DataSet containing a single tuple of two vectors (minVector, maxVector). @@ -151,25 +151,9 @@ object MinMaxScaler { }.reduce { (minMax1, minMax2) => { - val tempMinimum = linalg.Vector.zeros[Double](minMax1._1.length) - - for (i <- 0 until minMax1._1.length) { - tempMinimum(i) = if (minMax1._1(i) < minMax2._1(i)) { - minMax1._1(i) - } else { - minMax2._1(i) - } - } - - val tempMaximum = linalg.Vector.zeros[Double](minMax1._2.length) + val tempMinimum = min(minMax1._1, minMax2._1) + val tempMaximum = max(minMax1._2, minMax2._2) - for (i <- 0 until minMax1._2.length) { - tempMaximum(i) = if (minMax1._2(i) > minMax2._2(i)) { - minMax1._2(i) - } else { - minMax2._2(i) - } - } (tempMinimum, tempMaximum) } } @@ -203,8 +187,16 @@ object MinMaxScaler { val (broadcastMin, broadcastMax) = metrics var myVector = vector.asBreeze + //handle the case where a feature takes only one value + val rangePerFeature = (broadcastMax - broadcastMin) + for (i <- 0 until rangePerFeature.size) { + if (rangePerFeature(i) == 0.0) { + rangePerFeature(i)= 1.0 + } + } + myVector -= broadcastMin - myVector :/= (broadcastMax - broadcastMin) + myVector :/= rangePerFeature myVector = (myVector :* (max - min)) + min myVector.fromBreeze } @@ -236,11 +228,18 @@ object MinMaxScaler { val LabeledVector(label, vector) = labeledVector var breezeVector = vector.asBreeze + //handle the case where a feature takes only one value + val rangePerFeature = (broadcastMax - broadcastMin) + for (i <- 0 until rangePerFeature.size) { + if (rangePerFeature(i) == 0.0) { + rangePerFeature(i)= 1.0 + } + } + breezeVector -= broadcastMin - breezeVector :/= (broadcastMax - broadcastMin) + breezeVector :/= rangePerFeature breezeVector = (breezeVector :* (max - min)) + min LabeledVector(label, breezeVector.fromBreeze) - } } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala index 8ba78c7fc6002..75ac442af0fb5 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.ml.preprocessing -import breeze.linalg +import breeze.linalg.{max, min} import org.apache.flink.api.scala._ import org.apache.flink.ml.common.LabeledVector import org.apache.flink.ml.math.Breeze._ @@ -35,7 +35,7 @@ class MinMaxScalerITSuite import MinMaxScalerData._ - it should "scale the vectors' values to be restricted in the (0.0,1.0) range" in { + it should "scale the vectors' values to be restricted in the [0.0,1.0] range" in { val env = ExecutionEnvironment.getExecutionEnvironment @@ -46,135 +46,198 @@ class MinMaxScalerITSuite scaledVectors.length should equal(data.length) + //ensure data lies in the user-specified range for (vector <- scaledVectors) { val test = vector.asBreeze.forall(fv => { fv >= 0.0 && fv <= 1.0 }) - test shouldEqual (true) + test shouldEqual true } + var expectedMin = data.head.asBreeze + var expectedMax = data.head.asBreeze + + for (v <- data.tail) { + val tempVector = v.asBreeze + expectedMin = min(expectedMin, tempVector) + expectedMax = max(expectedMax, tempVector) + } + + //ensure that estimated Min and Max vectors equal the expected ones + val estimatedMinMax = minMaxScaler.metricsOption.get.collect() + estimatedMinMax.head shouldEqual(expectedMin, expectedMax) + + //handle the case where a feature takes only one value + val expectedRangePerFeature = (expectedMax - expectedMin) + for (i <- 0 until expectedRangePerFeature.size) { + if (expectedRangePerFeature(i) == 0.0) { + expectedRangePerFeature(i)= 1.0 + } + } + + //ensure that vectors where scaled correctly + for (i <- 0 until data.length) { + var expectedVector = data(i).asBreeze - expectedMin + expectedVector :/= expectedRangePerFeature + expectedVector = expectedVector :* (1.0 - 0.0) + + expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i) + } } - it should "scale vectors' values in the (-1.0,1.0) range" in { + it should "scale vectors' values in the [-1.0,1.0] range" in { val env = ExecutionEnvironment.getExecutionEnvironment - val dataSet = env.fromCollection(data2) - val minMaxScaler = MinMaxScaler().setMin(-1.0).setMax(1.0) + val dataSet = env.fromCollection(labeledData) + val minMaxScaler = MinMaxScaler().setMin(-1.0) minMaxScaler.fit(dataSet) val scaledVectors = minMaxScaler.transform(dataSet).collect - scaledVectors.length should equal(data2.length) + scaledVectors.length should equal(labeledData.length) + //ensure data lies in the user-specified range for (labeledVector <- scaledVectors) { val test = labeledVector.vector.asBreeze.forall(lv => { lv >= -1.0 && lv <= 1.0 }) - test shouldEqual (true) + test shouldEqual true } + var expectedMin = labeledData.head.vector.asBreeze + var expectedMax = labeledData.head.vector.asBreeze + + for (v <- labeledData.tail) { + val tempVector = v.vector.asBreeze + expectedMin = min(expectedMin, tempVector) + expectedMax = max(expectedMax, tempVector) + } + + //ensure that estimated Min and Max vectors equal the expected ones + val estimatedMinMax = minMaxScaler.metricsOption.get.collect() + estimatedMinMax.head shouldEqual(expectedMin, expectedMax) + + //handle the case where a feature takes only one value + val expectedRangePerFeature = (expectedMax - expectedMin) + for (i <- 0 until expectedRangePerFeature.size) { + if (expectedRangePerFeature(i) == 0.0) { + expectedRangePerFeature(i)= 1.0 + } + } + + //ensure that LabeledVectors where scaled correctly + for (i <- 0 until labeledData.length) { + var expectedVector = labeledData(i).vector.asBreeze - expectedMin + expectedVector :/= expectedRangePerFeature + expectedVector = (expectedVector :* (1.0 + 1.0)) - 1.0 + + labeledData(i).label shouldEqual scaledVectors(i).label + expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i) + .vector + } } } object MinMaxScalerData { - val data: Seq[Vector] = List(DenseVector(Array(2104.00, 3.00)), - DenseVector(Array(1600.00, 3.00)), - DenseVector(Array(2400.00, 3.00)), - DenseVector(Array(1416.00, 2.00)), - DenseVector(Array(3000.00, 4.00)), - DenseVector(Array(1985.00, 4.00)), - DenseVector(Array(1534.00, 3.00)), - DenseVector(Array(1427.00, 3.00)), - DenseVector(Array(1380.00, 3.00)), - DenseVector(Array(1494.00, 3.00)), - DenseVector(Array(1940.00, 4.00)), - DenseVector(Array(2000.00, 3.00)), - DenseVector(Array(1890.00, 3.00)), - DenseVector(Array(4478.00, 5.00)), - DenseVector(Array(1268.00, 3.00)), - DenseVector(Array(2300.00, 4.00)), - DenseVector(Array(1320.00, 2.00)), - DenseVector(Array(1236.00, 3.00)), - DenseVector(Array(2609.00, 4.00)), - DenseVector(Array(3031.00, 4.00)), - DenseVector(Array(1767.00, 3.00)), - DenseVector(Array(1888.00, 2.00)), - DenseVector(Array(1604.00, 3.00)), - DenseVector(Array(1962.00, 4.00)), - DenseVector(Array(3890.00, 3.00)), - DenseVector(Array(1100.00, 3.00)), - DenseVector(Array(1458.00, 3.00)), - DenseVector(Array(2526.00, 3.00)), - DenseVector(Array(2200.00, 3.00)), - DenseVector(Array(2637.00, 3.00)), - DenseVector(Array(1839.00, 2.00)), - DenseVector(Array(1000.00, 1.00)), - DenseVector(Array(2040.00, 4.00)), - DenseVector(Array(3137.00, 3.00)), - DenseVector(Array(1811.00, 4.00)), - DenseVector(Array(1437.00, 3.00)), - DenseVector(Array(1239.00, 3.00)), - DenseVector(Array(2132.00, 4.00)), - DenseVector(Array(4215.00, 4.00)), - DenseVector(Array(2162.00, 4.00)), - DenseVector(Array(1664.00, 2.00)), - DenseVector(Array(2238.00, 3.00)), - DenseVector(Array(2567.00, 4.00)), - DenseVector(Array(1200.00, 3.00)), - DenseVector(Array(852.00, 2.00)), - DenseVector(Array(1852.00, 4.00)), - DenseVector(Array(1203.00, 3.00)) + val data: Seq[Vector] = List( + DenseVector(Array(2104.00, 3.00, 0.0)), + DenseVector(Array(1600.00, 3.00, 0.0)), + DenseVector(Array(2400.00, 3.00, 0.0)), + DenseVector(Array(1416.00, 2.00, 0.0)), + DenseVector(Array(3000.00, 4.00, 0.0)), + DenseVector(Array(1985.00, 4.00, 0.0)), + DenseVector(Array(1534.00, 3.00, 0.0)), + DenseVector(Array(1427.00, 3.00, 0.0)), + DenseVector(Array(1380.00, 3.00, 0.0)), + DenseVector(Array(1494.00, 3.00, 0.0)), + DenseVector(Array(1940.00, 4.00, 0.0)), + DenseVector(Array(2000.00, 3.00, 0.0)), + DenseVector(Array(1890.00, 3.00, 0.0)), + DenseVector(Array(4478.00, 5.00, 0.0)), + DenseVector(Array(1268.00, 3.00, 0.0)), + DenseVector(Array(2300.00, 4.00, 0.0)), + DenseVector(Array(1320.00, 2.00, 0.0)), + DenseVector(Array(1236.00, 3.00, 0.0)), + DenseVector(Array(2609.00, 4.00, 0.0)), + DenseVector(Array(3031.00, 4.00, 0.0)), + DenseVector(Array(1767.00, 3.00, 0.0)), + DenseVector(Array(1888.00, 2.00, 0.0)), + DenseVector(Array(1604.00, 3.00, 0.0)), + DenseVector(Array(1962.00, 4.00, 0.0)), + DenseVector(Array(3890.00, 3.00, 0.0)), + DenseVector(Array(1100.00, 3.00, 0.0)), + DenseVector(Array(1458.00, 3.00, 0.0)), + DenseVector(Array(2526.00, 3.00, 0.0)), + DenseVector(Array(2200.00, 3.00, 0.0)), + DenseVector(Array(2637.00, 3.00, 0.0)), + DenseVector(Array(1839.00, 2.00, 0.0)), + DenseVector(Array(1000.00, 1.00, 0.0)), + DenseVector(Array(2040.00, 4.00, 0.0)), + DenseVector(Array(3137.00, 3.00, 0.0)), + DenseVector(Array(1811.00, 4.00, 0.0)), + DenseVector(Array(1437.00, 3.00, 0.0)), + DenseVector(Array(1239.00, 3.00, 0.0)), + DenseVector(Array(2132.00, 4.00, 0.0)), + DenseVector(Array(4215.00, 4.00, 0.0)), + DenseVector(Array(2162.00, 4.00, 0.0)), + DenseVector(Array(1664.00, 2.00, 0.0)), + DenseVector(Array(2238.00, 3.00, 0.0)), + DenseVector(Array(2567.00, 4.00, 0.0)), + DenseVector(Array(1200.00, 3.00, 0.0)), + DenseVector(Array(852.00, 2.00, 0.0)), + DenseVector(Array(1852.00, 4.00, 0.0)), + DenseVector(Array(1203.00, 3.00, 0.0)) ) - val data2: Seq[LabeledVector] = List ( - LabeledVector(1.0, DenseVector(Array(2104.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1600.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2400.00, 3.00))), - LabeledVector(0.0, DenseVector(Array(1416.00, 2.00))), - LabeledVector(1.0, DenseVector(Array(3000.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(1985.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(1534.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1427.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1380.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1494.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1940.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(2000.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1890.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(4478.00, 5.00))), - LabeledVector(1.0, DenseVector(Array(1268.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2300.00, 4.00))), - LabeledVector(0.0, DenseVector(Array(1320.00, 2.00))), - LabeledVector(1.0, DenseVector(Array(1236.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2609.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(3031.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(1767.00, 3.00))), - LabeledVector(0.0, DenseVector(Array(1888.00, 2.00))), - LabeledVector(1.0, DenseVector(Array(1604.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1962.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(3890.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1100.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1458.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2526.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2200.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2637.00, 3.00))), - LabeledVector(0.0, DenseVector(Array(1839.00, 2.00))), - LabeledVector(0.0, DenseVector(Array(1000.00, 1.00))), - LabeledVector(1.0, DenseVector(Array(2040.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(3137.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1811.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(1437.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(1239.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2132.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(4215.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(2162.00, 4.00))), - LabeledVector(0.0, DenseVector(Array(1664.00, 2.00))), - LabeledVector(1.0, DenseVector(Array(2238.00, 3.00))), - LabeledVector(1.0, DenseVector(Array(2567.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(1200.00, 3.00))), - LabeledVector(0.0, DenseVector(Array(852.00, 2.00))), - LabeledVector(1.0, DenseVector(Array(1852.00, 4.00))), - LabeledVector(1.0, DenseVector(Array(1203.00, 3.00))) + val labeledData: Seq[LabeledVector] = List( + LabeledVector(1.0, DenseVector(Array(2104.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1600.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2400.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1416.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3000.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1985.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1534.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1427.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1380.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1494.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1940.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2000.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1890.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(4478.00, 5.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1268.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2300.00, 4.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1320.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1236.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2609.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3031.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1767.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1888.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1604.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1962.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3890.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1100.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1458.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2526.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2200.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2637.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1839.00, 2.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1000.00, 1.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2040.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(3137.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1811.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1437.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1239.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2132.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(4215.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2162.00, 4.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(1664.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2238.00, 3.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(2567.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1200.00, 3.00, 0.0))), + LabeledVector(0.0, DenseVector(Array(852.00, 2.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1852.00, 4.00, 0.0))), + LabeledVector(1.0, DenseVector(Array(1203.00, 3.00, 0.0))) ) }