Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,27 @@ object StandardScaler {
}
}

/** Trains the [[StandardScaler]] by learning the mean and standard deviation of the training
* data which is of type ([[Vector]], Double). The mean and standard deviation are used to
* transform the given input data.
*
*/
implicit def fitLabelVectorTupleStandardScaler
[T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = {
new FitOperation[StandardScaler, (T, Double)] {
override def fit(
instance: StandardScaler,
fitParameters: ParameterMap,
input: DataSet[(T, Double)])
: Unit = {
val vectorDS = input.map(_._1)
val metrics = extractFeatureMetrics(vectorDS)

instance.metricsOption = Some(metrics)
}
}
}

/** Calculates in one pass over the data the features' mean and standard deviation.
* For the calculation of the Standard deviation with one pass over the data,
* the Youngs & Cramer algorithm was used:
Expand Down Expand Up @@ -240,8 +261,8 @@ object StandardScaler {
implicit def transformVectors[T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = {
new StandardScalerTransformOperation[T]() {
override def transform(
vector: T,
model: (linalg.Vector[Double], linalg.Vector[Double]))
vector: T,
model: (linalg.Vector[Double], linalg.Vector[Double]))
: T = {
scale(vector, model)
}
Expand Down Expand Up @@ -278,29 +299,4 @@ object StandardScaler {
LabeledVector(label, scale(vector, model))
}
}

/** Scales the given vector such that it has the given mean and std
*
* @param vector Vector to be scaled
* @param dataMean Mean of the training data
* @param dataStd Standard deviation of the training data
* @param mean Mean of the scaled data
* @param std Standard deviation of the scaled data
* @tparam T Type of [[Vector]]
* @return Scaled vector
*/
private def scaleVector[T <: Vector: BreezeVectorConverter](
vector: T,
dataMean: linalg.Vector[Double],
dataStd: linalg.Vector[Double],
mean: Double,
std: Double)
: T = {
var myVector = vector.asBreeze

myVector -= dataMean
myVector :/= dataStd
myVector = (myVector :* std) + mean
myVector.fromBreeze
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import breeze.linalg
import breeze.numerics.sqrt
import breeze.numerics.sqrt._
import org.apache.flink.api.scala._
import org.apache.flink.ml.math.{Vector, DenseVector}
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
import org.apache.flink.test.util.FlinkTestBase
import org.apache.flink.ml.math.Breeze._
import org.scalatest._
Expand All @@ -36,15 +37,10 @@ class StandardScalerITSuite

import StandardScalerData._

it should "scale the vectors to have mean equal to 0 and std equal to 1" in {

val env = ExecutionEnvironment.getExecutionEnvironment

val dataSet = env.fromCollection(data)
val scaler = StandardScaler()
scaler.fit(dataSet)
val scaledVectors = scaler.transform(dataSet).collect

def checkVectors(
scaledVectors: Seq[FlinkVector],
expectedMean: Double,
expectedStd: Double): Unit = {
scaledVectors.length should equal(data.length)

val numberOfFeatures = scaledVectors(0).size
Expand All @@ -64,49 +60,62 @@ class StandardScalerITSuite
scaledStd = sqrt(scaledStd)

for (i <- 0 until numberOfFeatures) {
scaledMean(i) should be(0.0 +- (0.0000000000001))
scaledStd(i) should be(1.0 +- (0.0000000000001))
scaledMean(i) should be(expectedMean +- 1e-9)
scaledStd(i) should be(expectedStd +- 1e-9)
}
}

it should "scale the vectors to have mean equal to 0 and std equal to 1" in {

val env = ExecutionEnvironment.getExecutionEnvironment

val dataSet = env.fromCollection(data)
val scaler = StandardScaler()
scaler.fit(dataSet)
val scaledVectors = scaler.transform(dataSet).collect()

checkVectors(scaledVectors, 0.0, 1.0)
}

it should "scale the vectors to have mean equal to 10 and standard deviation equal to 2" in {

val env = ExecutionEnvironment.getExecutionEnvironment

val dataSet = env.fromCollection(data)
val scaler = StandardScaler().setMean(10.0).setStd(2.0)
scaler.fit(dataSet)
val scaledVectors = scaler.transform(dataSet).collect
val scaledVectors = scaler.transform(dataSet).collect()

scaledVectors.length should equal(data.length)
checkVectors(scaledVectors, 10.0, 2.0)
}

val numberOfFeatures = scaledVectors(0).size
var scaledMean: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures)
var scaledStd: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures)
it should "work with LabeledVector" in {
val env = ExecutionEnvironment.getExecutionEnvironment

for (vector <- scaledVectors) {
scaledMean += vector.asBreeze
}
val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v))
val scaler = StandardScaler()
scaler.fit(dataSet)
val scaledVectors = scaler.transform(dataSet).map(lv => lv.vector).collect()

scaledMean /= scaledVectors.size.asInstanceOf[Double]
checkVectors(scaledVectors, 0.0, 1.0)
}

for (vector <- scaledVectors) {
val temp = vector.asBreeze - scaledMean
scaledStd += temp :* temp
}
scaledStd /= scaledVectors.size.asInstanceOf[Double]
scaledStd = sqrt(scaledStd)
it should "work with (FlinkVector, Double) tuples" in {
val env = ExecutionEnvironment.getExecutionEnvironment

for (i <- 0 until numberOfFeatures) {
scaledMean(i) should be(10.0 +- (0.0000000000001))
scaledStd(i) should be(2.0 +- (0.0000000000001))
}
val dataSet = env.fromCollection(data).map(v => (v, 1.0))
val scaler = StandardScaler()
scaler.fit(dataSet)
val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect()

checkVectors(scaledVectors, 0.0, 1.0)
}
}

object StandardScalerData {

val data: Seq[Vector] = List(DenseVector(Array(2104.00, 3.00)),
val data: Seq[FlinkVector] = List(
DenseVector(Array(2104.00, 3.00)),
DenseVector(Array(1600.00, 3.00)),
DenseVector(Array(2400.00, 3.00)),
DenseVector(Array(1416.00, 2.00)),
Expand Down