Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19368][MLlib] BlockMatrix.toIndexedRowMatrix() optimization for sparse matrices #16732

Closed
wants to merge 7 commits into from

Conversation

uzadude
Copy link
Contributor

@uzadude uzadude commented Jan 29, 2017

What changes were proposed in this pull request?

Optimization [SPARK-12869] was made for dense matrices but caused great performance issue for sparse matrices because manipulating them is very inefficient. When manipulating sparse matrices in Breeze we better use VectorBuilder.

How was this patch tested?

checked it against a use case that we have that after moving to Spark 2 took 6.5 hours instead of 20 mins. After the change it is back to 20 mins again.

val wholeVectorBuf = VectorBuilder.zeros[Double](cols)
vectors.foreach { case (blockColIdx: Int, vec: BV[Double]) =>
val offset = colsPerBlock * blockColIdx
vec.activeIterator.foreach { case (colIdx: Int, value: Double) => wholeVectorBuf.add(offset + colIdx, value) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line includes more than 100 characters.

@@ -280,17 +278,24 @@ class BlockMatrix @Since("1.3.0") (
}.groupByKey().map { case (rowIdx, vectors) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At line 276, do we need to call asBreeze for vector. I think that it is okay with (blockIndex, vector) to avoid an object creation in a method asBreeze.

@kiszk
Copy link
Member

kiszk commented Jan 29, 2017

Could you please add performance results without/with this PR?

@kiszk
Copy link
Member

kiszk commented Jan 29, 2017

Jenkins, retest this please

@uzadude
Copy link
Contributor Author

uzadude commented Jan 29, 2017

Hi,
I removed the .asBreeze call for the sparse case.
about the performance issue for this sample code:

    val n = 20000
    val rndEntryList = GraphTestUtils.getRandomMatrixEntries(n,n,density = 0.001, seed = 123).map { case (i,j,d) => (i, (j,d)) }
    val entries: RDD[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 4)
    val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, Vectors.sparse(n, e._2.toSeq)))
    val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n).toBlockMatrix(1000,1000).cache()

    mat.blocks.count()

    val t1 = System.nanoTime()
    println(mat.toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
    val t2 = System.nanoTime()
    println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
    println("============================================================")
    println(mat.toIndexedRowMatrixNew().rows.map(_.vector.numActives).sum())
    val t3 = System.nanoTime()
    println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
    println("============================================================")
    //println(BlockMatrixFixedOps.toIndexedRowMatrix(mat).rows.map(_.vector.numActives).sum())
    println(mat.toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
    val t4 = System.nanoTime()
    println("took: " + (t4 - t3) / 1000 / 1000 + " ms")
    println("============================================================")

I get:

took: 1242 ms
took: 2772 ms
took: 19647 ms

@kiszk
Copy link
Member

kiszk commented Jan 30, 2017

Thank you.

If I understand correctly, matSparse in "toIndexedRowMatrix" does not cover a new path.
If so, it would be good to add a more sparse matrix to validate a new path.

@uzadude
Copy link
Contributor Author

uzadude commented Jan 30, 2017

not sure I understand, matSparse has 1/10th nnz as needed for line 282:

if (numberNonZeroPerRow <= 0.1) { // Sparse at 1/10th nnz

@kiszk
Copy link
Member

kiszk commented Jan 30, 2017

Sorry, as you pointed out, matSparse has 0.1 for numberNonZeroPerRow.

@akaltsikis
Copy link

has the issue been resolved ?

@kiszk
Copy link
Member

kiszk commented Apr 19, 2017

Jenkins, test this please

@akaltsikis
Copy link

Hey guys i was implementing that as an external function in jar to work with spark 2.1.1. Even if @uzadude improved performance of the 2.x implementation by creating 2 seperate cases for sparse and dense matrices it seems that the Blockmatrix.toCoordinateMatrix().toIndexedRowMatrix() still works faster according to my recent benchmarks.I would suggest that we put that in case of numberNonZeroPerRow<0.1 till the function for sparse matrices has better performance that the double conversion.

@srowen
Copy link
Member

srowen commented Nov 14, 2018

Looks good @uzadude ; just saw this very old PR. However what about @akaltsikis 's comment?

@akaltsikis
Copy link

Looks good @uzadude ; just saw this very old PR. However what about @akaltsikis 's comment?

@srowen Tbh after 1 year and half i really can't recall many details.
I guess due to the better performance of Blockmatrix.toCoordinateMatrix().toIndexedRowMatrix() for matrices (sparser than numberNonZeroPerRow<0.1) to use that instead of the function we are using as it should give us the same response in half of the time.

@uzadude
Copy link
Contributor Author

uzadude commented Nov 15, 2018

After running some more experiments I was able to reduce the runtime by another 1.5x factor. So currently the "toCoordinateMatrix().toIndexedRowMatrix()" is better by a bit only in the extreme cases when the block matrix size was somewhat incorrectly configured (as above - 1000x1000 and density 1/1000) - meaning it will contain many rows with only one value, then the gain comes only from the overhead of shuffling primitive instead of a Vector. So I generally think this approach is better.

@akaltsikis
Copy link

After running some more experiments I was able to reduce the runtime by another 1.5x factor. So currently the "toCoordinateMatrix().toIndexedRowMatrix()" is better by a bit only in the extreme cases when the block matrix size was somewhat incorrectly configured (as above - 1000x1000 and density 1/1000) - meaning it will contain many rows with only one value, then the gain comes only from the overhead of shuffling primitive instead of a Vector. So I generally think this approach is better.

Agreed. Good Job as always my friend @uzadude

@SparkQA
Copy link

SparkQA commented Nov 15, 2018

Test build #4426 has finished for PR 16732 at commit 5531806.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

val arrBufferIndices = new ArrayBuffer[Int](numberNonZero)
val arrBufferValues = new ArrayBuffer[Double](numberNonZero)

vectors.foreach { case (blockColIdx: Int, vec: SparseVector) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to be Vector and not SparseVector

blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector.asBreeze))
}
blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector))
}.filter(_._2._2.size > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose you could filter just before the map too, but it won't matter much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop the second filter now, right?

@uzadude
Copy link
Contributor Author

uzadude commented Nov 15, 2018

@srowen - you're right. I've fixed it.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise fine.

blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector.asBreeze))
}
blockRowIdx * rowsPerBlock + rowIdx -> ((blockColIdx, vector))
}.filter(_._2._2.size > 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can drop the second filter now, right?

@SparkQA
Copy link

SparkQA commented Nov 22, 2018

Test build #4438 has finished for PR 16732 at commit 540b261.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 22, 2018

Test build #4439 has finished for PR 16732 at commit 540b261.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Nov 22, 2018

Merged to master

@asfgit asfgit closed this in d81d95a Nov 22, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…r sparse matrices

## What changes were proposed in this pull request?

Optimization [SPARK-12869] was made for dense matrices but caused great performance issue for sparse matrices because manipulating them is very inefficient. When manipulating sparse matrices in Breeze we better use VectorBuilder.

## How was this patch tested?

checked it against a use case that we have that after moving to Spark 2 took 6.5 hours instead of 20 mins. After the change it is back to 20 mins again.

Closes apache#16732 from uzadude/SparseVector_optimization.

Authored-by: oraviv <oraviv@paypal.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants