Skip to content

Commit

Permalink
[SPARK-26158][MLLIB] fix covariance accuracy problem for DenseVector
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Enhance accuracy of the covariance logic in RowMatrix for function computeCovariance

## How was this patch tested?
Unit test
Accuracy test

Closes #23126 from KyleLi1985/master.

Authored-by: 李亮 <liang.li.work@outlook.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
KyleLi1985 authored and srowen committed Nov 29, 2018
1 parent 1144df3 commit 9fdc7a8
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,77 @@ class RowMatrix @Since("1.0.0") (
RowMatrix.triuToFull(n, GU.data)
}

private def computeDenseVectorCovariance(mean: Vector, n: Int, m: Long): Matrix = {

val bc = rows.context.broadcast(mean)

// Computes n*(n+1)/2, avoiding overflow in the multiplication.
// This succeeds when n <= 65535, which is checked above
val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))

val MU = rows.treeAggregate(new BDV[Double](nt))(
seqOp = (U, v) => {

val n = v.size
val na = Array.ofDim[Double](n)
val means = bc.value

val ta = v.toArray
for (index <- 0 until n) {
na(index) = ta(index) - means(index)
}

BLAS.spr(1.0, new DenseVector(na), U.data)
U
}, combOp = (U1, U2) => U1 += U2)

bc.destroy()

val M = RowMatrix.triuToFull(n, MU.data).asBreeze

var i = 0
var j = 0
val m1 = m - 1.0
while (i < n) {
j = i
while (j < n) {
val Mij = M(i, j) / m1
M(i, j) = Mij
M(j, i) = Mij
j += 1
}
i += 1
}

Matrices.fromBreeze(M)
}

private def computeSparseVectorCovariance(mean: Vector, n: Int, m: Long): Matrix = {

// We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is
// large but Cov(X, Y) is small, but it is good for sparse computation.
// TODO: find a fast and stable way for sparse data.
val G = computeGramianMatrix().asBreeze

var i = 0
var j = 0
val m1 = m - 1.0
var alpha = 0.0
while (i < n) {
alpha = m / m1 * mean(i)
j = i
while (j < n) {
val Gij = G(i, j) / m1 - alpha * mean(j)
G(i, j) = Gij
G(j, i) = Gij
j += 1
}
i += 1
}

Matrices.fromBreeze(G)
}

private def checkNumColumns(cols: Int): Unit = {
if (cols > 65535) {
throw new IllegalArgumentException(s"Argument with more than 65535 cols: $cols")
Expand Down Expand Up @@ -337,29 +408,11 @@ class RowMatrix @Since("1.0.0") (
" Cannot compute the covariance of a RowMatrix with <= 1 row.")
val mean = summary.mean

// We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is
// large but Cov(X, Y) is small, but it is good for sparse computation.
// TODO: find a fast and stable way for sparse data.

val G = computeGramianMatrix().asBreeze

var i = 0
var j = 0
val m1 = m - 1.0
var alpha = 0.0
while (i < n) {
alpha = m / m1 * mean(i)
j = i
while (j < n) {
val Gij = G(i, j) / m1 - alpha * mean(j)
G(i, j) = Gij
G(j, i) = Gij
j += 1
}
i += 1
if (rows.first().isInstanceOf[DenseVector]) {
computeDenseVectorCovariance(mean, n, m)
} else {
computeSparseVectorCovariance(mean, n, m)
}

Matrices.fromBreeze(G)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.mllib.linalg.DenseVector;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -67,7 +66,7 @@ public void testPCA() {
JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2);

RowMatrix mat = new RowMatrix(dataRDD.map(
(Vector vector) -> (org.apache.spark.mllib.linalg.Vector) new DenseVector(vector.toArray())
(Vector vector) -> org.apache.spark.mllib.linalg.Vectors.fromML(vector)
).rdd());

Matrix pc = mat.computePrincipalComponents(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,20 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
}
}

test("dense vector covariance accuracy (SPARK-26158)") {
val denseData = Seq(
Vectors.dense(100000.000004, 199999.999999),
Vectors.dense(100000.000012, 200000.000002),
Vectors.dense(99999.9999931, 200000.000003),
Vectors.dense(99999.9999977, 200000.000001)
)
val denseMat = new RowMatrix(sc.parallelize(denseData, 2))

val result = denseMat.computeCovariance()
val expected = breeze.linalg.cov(denseMat.toBreeze())
assert(closeToZero(abs(expected) - abs(result.asBreeze.asInstanceOf[BDM[Double]])))
}

test("compute covariance") {
for (mat <- Seq(denseMat, sparseMat)) {
val result = mat.computeCovariance()
Expand Down

0 comments on commit 9fdc7a8

Please sign in to comment.