From aa18b3dc82475c179f56fad25363184a47c36c2f Mon Sep 17 00:00:00 2001 From: Theodore Vasiloudis Date: Fri, 10 Jul 2015 11:34:32 +0200 Subject: [PATCH] More tests and a new operation for StandardScaler --- .../ml/preprocessing/StandardScaler.scala | 50 ++++++------- .../preprocessing/StandardScalerITSuite.scala | 73 +++++++++++-------- 2 files changed, 64 insertions(+), 59 deletions(-) diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala index c62657f98f1a8..82e8abf4df1b6 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala @@ -145,6 +145,27 @@ object StandardScaler { } } + /** Trains the [[StandardScaler]] by learning the mean and standard deviation of the training + * data which is of type ([[Vector]], Double). The mean and standard deviation are used to + * transform the given input data. + * + */ + implicit def fitLabelVectorTupleStandardScaler + [T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { + new FitOperation[StandardScaler, (T, Double)] { + override def fit( + instance: StandardScaler, + fitParameters: ParameterMap, + input: DataSet[(T, Double)]) + : Unit = { + val vectorDS = input.map(_._1) + val metrics = extractFeatureMetrics(vectorDS) + + instance.metricsOption = Some(metrics) + } + } + } + /** Calculates in one pass over the data the features' mean and standard deviation. * For the calculation of the Standard deviation with one pass over the data, * the Youngs & Cramer algorithm was used: @@ -240,8 +261,8 @@ object StandardScaler { implicit def transformVectors[T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { new StandardScalerTransformOperation[T]() { override def transform( - vector: T, - model: (linalg.Vector[Double], linalg.Vector[Double])) + vector: T, + model: (linalg.Vector[Double], linalg.Vector[Double])) : T = { scale(vector, model) } @@ -278,29 +299,4 @@ object StandardScaler { LabeledVector(label, scale(vector, model)) } } - - /** Scales the given vector such that it has the given mean and std - * - * @param vector Vector to be scaled - * @param dataMean Mean of the training data - * @param dataStd Standard deviation of the training data - * @param mean Mean of the scaled data - * @param std Standard deviation of the scaled data - * @tparam T Type of [[Vector]] - * @return Scaled vector - */ - private def scaleVector[T <: Vector: BreezeVectorConverter]( - vector: T, - dataMean: linalg.Vector[Double], - dataStd: linalg.Vector[Double], - mean: Double, - std: Double) - : T = { - var myVector = vector.asBreeze - - myVector -= dataMean - myVector :/= dataStd - myVector = (myVector :* std) + mean - myVector.fromBreeze - } } diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala index 30875b379355c..5cd253d587571 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala @@ -21,7 +21,8 @@ import breeze.linalg import breeze.numerics.sqrt import breeze.numerics.sqrt._ import org.apache.flink.api.scala._ -import org.apache.flink.ml.math.{Vector, DenseVector} +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} import org.apache.flink.test.util.FlinkTestBase import org.apache.flink.ml.math.Breeze._ import org.scalatest._ @@ -36,15 +37,10 @@ class StandardScalerITSuite import StandardScalerData._ - it should "scale the vectors to have mean equal to 0 and std equal to 1" in { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val dataSet = env.fromCollection(data) - val scaler = StandardScaler() - scaler.fit(dataSet) - val scaledVectors = scaler.transform(dataSet).collect - + def checkVectors( + scaledVectors: Seq[FlinkVector], + expectedMean: Double, + expectedStd: Double): Unit = { scaledVectors.length should equal(data.length) val numberOfFeatures = scaledVectors(0).size @@ -64,11 +60,23 @@ class StandardScalerITSuite scaledStd = sqrt(scaledStd) for (i <- 0 until numberOfFeatures) { - scaledMean(i) should be(0.0 +- (0.0000000000001)) - scaledStd(i) should be(1.0 +- (0.0000000000001)) + scaledMean(i) should be(expectedMean +- 1e-9) + scaledStd(i) should be(expectedStd +- 1e-9) } } + it should "scale the vectors to have mean equal to 0 and std equal to 1" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).collect() + + checkVectors(scaledVectors, 0.0, 1.0) + } + it should "scale the vectors to have mean equal to 10 and standard deviation equal to 2" in { val env = ExecutionEnvironment.getExecutionEnvironment @@ -76,37 +84,38 @@ class StandardScalerITSuite val dataSet = env.fromCollection(data) val scaler = StandardScaler().setMean(10.0).setStd(2.0) scaler.fit(dataSet) - val scaledVectors = scaler.transform(dataSet).collect + val scaledVectors = scaler.transform(dataSet).collect() - scaledVectors.length should equal(data.length) + checkVectors(scaledVectors, 10.0, 2.0) + } - val numberOfFeatures = scaledVectors(0).size - var scaledMean: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) - var scaledStd: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) + it should "work with LabeledVector" in { + val env = ExecutionEnvironment.getExecutionEnvironment - for (vector <- scaledVectors) { - scaledMean += vector.asBreeze - } + val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v)) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).map(lv => lv.vector).collect() - scaledMean /= scaledVectors.size.asInstanceOf[Double] + checkVectors(scaledVectors, 0.0, 1.0) + } - for (vector <- scaledVectors) { - val temp = vector.asBreeze - scaledMean - scaledStd += temp :* temp - } - scaledStd /= scaledVectors.size.asInstanceOf[Double] - scaledStd = sqrt(scaledStd) + it should "work with (FlinkVector, Double) tuples" in { + val env = ExecutionEnvironment.getExecutionEnvironment - for (i <- 0 until numberOfFeatures) { - scaledMean(i) should be(10.0 +- (0.0000000000001)) - scaledStd(i) should be(2.0 +- (0.0000000000001)) - } + val dataSet = env.fromCollection(data).map(v => (v, 1.0)) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect() + + checkVectors(scaledVectors, 0.0, 1.0) } } object StandardScalerData { - val data: Seq[Vector] = List(DenseVector(Array(2104.00, 3.00)), + val data: Seq[FlinkVector] = List( + DenseVector(Array(2104.00, 3.00)), DenseVector(Array(1600.00, 3.00)), DenseVector(Array(2400.00, 3.00)), DenseVector(Array(1416.00, 2.00)),