Skip to content

Commit

Permalink
[SPARK-4581][MLlib] Refactorize StandardScaler to improve the transfo…
Browse files Browse the repository at this point in the history
…rmation performance

The following optimizations are done to improve the StandardScaler model
transformation performance.

1) Covert Breeze dense vector to primitive vector to reduce the overhead.
2) Since mean can be potentially a sparse vector, we explicitly convert it to dense primitive vector.
3) Have a local reference to `shift` and `factor` array so JVM can locate the value with one operation call.
4) In pattern matching part, we use the mllib SparseVector/DenseVector instead of breeze's vector to
make the codebase cleaner.

Benchmark with mnist8m dataset:

Before,
DenseVector withMean and withStd: 50.97secs
DenseVector withMean and withoutStd: 42.11secs
DenseVector withoutMean and withStd: 8.75secs
SparseVector withoutMean and withStd: 5.437secs

With this PR,
DenseVector withMean and withStd: 5.76secs
DenseVector withMean and withoutStd: 5.28secs
DenseVector withoutMean and withStd: 5.30secs
SparseVector withoutMean and withStd: 1.27secs

Note that without the local reference copy of `factor` and `shift` arrays,
the runtime is almost three time slower.

DenseVector withMean and withStd: 18.15secs
DenseVector withMean and withoutStd: 18.05secs
DenseVector withoutMean and withStd: 18.54secs
SparseVector withoutMean and withStd: 2.01secs

The following code,
```scala
while (i < size) {
   values(i) = (values(i) - shift(i)) * factor(i)
   i += 1
}
```
will generate the bytecode
```
   L13
    LINENUMBER 106 L13
   FRAME FULL [org/apache/spark/mllib/feature/StandardScalerModel org/apache/spark/mllib/linalg/Vector org/apache/spark/mllib/linalg/Vector org/apache/spark/mllib/linalg/DenseVector T [D I I] []
    ILOAD 7
    ILOAD 6
    IF_ICMPGE L14
   L15
    LINENUMBER 107 L15
    ALOAD 5
    ILOAD 7
    ALOAD 5
    ILOAD 7
    DALOAD
    ALOAD 0
    INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.shift ()[D
    ILOAD 7
    DALOAD
    DSUB
    ALOAD 0
    INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.factor ()[D
    ILOAD 7
    DALOAD
    DMUL
    DASTORE
   L16
    LINENUMBER 108 L16
    ILOAD 7
    ICONST_1
    IADD
    ISTORE 7
    GOTO L13
```
, while with the local reference of the `shift` and `factor` arrays, the bytecode will be
```
   L14
    LINENUMBER 107 L14
    ALOAD 0
    INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.factor ()[D
    ASTORE 9
   L15
    LINENUMBER 108 L15
   FRAME FULL [org/apache/spark/mllib/feature/StandardScalerModel org/apache/spark/mllib/linalg/Vector [D org/apache/spark/mllib/linalg/Vector org/apache/spark/mllib/linalg/DenseVector T [D I I [D] []
    ILOAD 8
    ILOAD 7
    IF_ICMPGE L16
   L17
    LINENUMBER 109 L17
    ALOAD 6
    ILOAD 8
    ALOAD 6
    ILOAD 8
    DALOAD
    ALOAD 2
    ILOAD 8
    DALOAD
    DSUB
    ALOAD 9
    ILOAD 8
    DALOAD
    DMUL
    DASTORE
   L18
    LINENUMBER 110 L18
    ILOAD 8
    ICONST_1
    IADD
    ISTORE 8
    GOTO L15
```

You can see that with local reference, the both of the arrays will be in the stack, so JVM can access the value without calling `INVOKESPECIAL`.

Author: DB Tsai <dbtsai@alpinenow.com>

Closes apache#3435 from dbtsai/standardscaler and squashes the following commits:

85885a9 [DB Tsai] revert to have lazy in shift array.
daf2b06 [DB Tsai] Address the feedback
cdb5cef [DB Tsai] small change
9c51eef [DB Tsai] style
fc795e4 [DB Tsai] update
5bffd3d [DB Tsai] first commit
  • Loading branch information
DB Tsai authored and mengxr committed Nov 25, 2014
1 parent 69cd53e commit bf1a6aa
Showing 1 changed file with 50 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.mllib.feature

import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}

import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -77,8 +75,8 @@ class StandardScalerModel private[mllib] (

require(mean.size == variance.size)

private lazy val factor: BDV[Double] = {
val f = BDV.zeros[Double](variance.size)
private lazy val factor: Array[Double] = {
val f = Array.ofDim[Double](variance.size)
var i = 0
while (i < f.size) {
f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0
Expand All @@ -87,6 +85,11 @@ class StandardScalerModel private[mllib] (
f
}

// Since `shift` will be only used in `withMean` branch, we have it as
// `lazy val` so it will be evaluated in that branch. Note that we don't
// want to create this array multiple times in `transform` function.
private lazy val shift: Array[Double] = mean.toArray

/**
* Applies standardization transformation on a vector.
*
Expand All @@ -97,30 +100,57 @@ class StandardScalerModel private[mllib] (
override def transform(vector: Vector): Vector = {
require(mean.size == vector.size)
if (withMean) {
vector.toBreeze match {
case dv: BDV[Double] =>
val output = vector.toBreeze.copy
var i = 0
while (i < output.length) {
output(i) = (output(i) - mean(i)) * (if (withStd) factor(i) else 1.0)
i += 1
// By default, Scala generates Java methods for member variables. So every time when
// the member variables are accessed, `invokespecial` will be called which is expensive.
// This can be avoid by having a local reference of `shift`.
val localShift = shift
vector match {
case dv: DenseVector =>
val values = dv.values.clone()
val size = values.size
if (withStd) {
// Having a local reference of `factor` to avoid overhead as the comment before.
val localFactor = factor
var i = 0
while (i < size) {
values(i) = (values(i) - localShift(i)) * localFactor(i)
i += 1
}
} else {
var i = 0
while (i < size) {
values(i) -= localShift(i)
i += 1
}
}
Vectors.fromBreeze(output)
Vectors.dense(values)
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
}
} else if (withStd) {
vector.toBreeze match {
case dv: BDV[Double] => Vectors.fromBreeze(dv :* factor)
case sv: BSV[Double] =>
// Having a local reference of `factor` to avoid overhead as the comment before.
val localFactor = factor
vector match {
case dv: DenseVector =>
val values = dv.values.clone()
val size = values.size
var i = 0
while(i < size) {
values(i) *= localFactor(i)
i += 1
}
Vectors.dense(values)
case sv: SparseVector =>
// For sparse vector, the `index` array inside sparse vector object will not be changed,
// so we can re-use it to save memory.
val output = new BSV[Double](sv.index, sv.data.clone(), sv.length)
val indices = sv.indices
val values = sv.values.clone()
val nnz = values.size
var i = 0
while (i < output.data.length) {
output.data(i) *= factor(output.index(i))
while (i < nnz) {
values(i) *= localFactor(indices(i))
i += 1
}
Vectors.fromBreeze(output)
Vectors.sparse(sv.size, indices, values)
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
}
} else {
Expand Down

0 comments on commit bf1a6aa

Please sign in to comment.