Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11826][MLlib] Refactor add() and subtract() methods #9916

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg.distributed

import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseMatrix => BDM}
import breeze.linalg.{DenseMatrix => BDM, Matrix => BM}

import org.apache.spark.{Logging, Partitioner, SparkException}
import org.apache.spark.annotation.Since
Expand Down Expand Up @@ -317,40 +317,69 @@ class BlockMatrix @Since("1.3.0") (
}

/**
* Adds two block matrices together. The matrices must have the same size and matching
* `rowsPerBlock` and `colsPerBlock` values. If one of the blocks that are being added are
* instances of [[SparseMatrix]], the resulting sub matrix will also be a [[SparseMatrix]], even
* if it is being added to a [[DenseMatrix]]. If two dense matrices are added, the output will
* also be a [[DenseMatrix]].
* For given matrices `this` and `other` of compatible dimensions and compatible block dimensions,
* it applies a binary function on their corresponding blocks.
*
* @param other The second BlockMatrix argument for the operator specified by `binMap`
* @param binMap A function taking two breeze matrices and returning a breeze matrix
* @return A [[BlockMatrix]] whose blocks are the results of a specified binary map on blocks
* of `this` and `other`.
*/
@Since("1.3.0")
def add(other: BlockMatrix): BlockMatrix = {
private[mllib] def blockMap(
other: BlockMatrix,
binMap: (BM[Double], BM[Double]) => BM[Double]): BlockMatrix = {
require(numRows() == other.numRows(), "Both matrices must have the same number of rows. " +
s"A.numRows: ${numRows()}, B.numRows: ${other.numRows()}")
require(numCols() == other.numCols(), "Both matrices must have the same number of columns. " +
s"A.numCols: ${numCols()}, B.numCols: ${other.numCols()}")
if (rowsPerBlock == other.rowsPerBlock && colsPerBlock == other.colsPerBlock) {
val addedBlocks = blocks.cogroup(other.blocks, createPartitioner())
val newBlocks = blocks.cogroup(other.blocks, createPartitioner())
.map { case ((blockRowIndex, blockColIndex), (a, b)) =>
if (a.size > 1 || b.size > 1) {
throw new SparkException("There are multiple MatrixBlocks with indices: " +
s"($blockRowIndex, $blockColIndex). Please remove them.")
}
if (a.isEmpty) {
new MatrixBlock((blockRowIndex, blockColIndex), b.head)
val zeroBlock = BM.zeros[Double](b.head.numRows, b.head.numCols)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is making an explicit matrix of all 0s. It would be better to create a sparse matrix of all zeros to avoid the memory allocation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have said use CSCMatrix.zeros(rows, cols). What do you mean by "then would have to add every zero for later operations?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem now is that; there's no implicits for adding and subtracting CSCMatrix and BM! So should I leave that as is then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkbradley so I'll modify the doc (as mentioned below) and commit. I don't think there's a better option available for zero sparse matrices!

val result = binMap(zeroBlock, b.head.toBreeze)
new MatrixBlock((blockRowIndex, blockColIndex), Matrices.fromBreeze(result))
} else if (b.isEmpty) {
new MatrixBlock((blockRowIndex, blockColIndex), a.head)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this line is not changed, I'd add a note the the blockMap documentation warning that it ONLY works for addition and subtraction. E.g., it will not work for operator (a, b) => -a + b.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's a good idea. But perhaps in another jira, I'll implement, for example, multiplying a Double to every element, and also perform blockwise or elementwise operations which're missing now. Other suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those operations will be good, but they may have to wait on more discussions about local linear algebra (just to warn you).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the heads up!

} else {
val result = a.head.toBreeze + b.head.toBreeze
val result = binMap(a.head.toBreeze, b.head.toBreeze)
new MatrixBlock((blockRowIndex, blockColIndex), Matrices.fromBreeze(result))
}
}
new BlockMatrix(addedBlocks, rowsPerBlock, colsPerBlock, numRows(), numCols())
new BlockMatrix(newBlocks, rowsPerBlock, colsPerBlock, numRows(), numCols())
} else {
throw new SparkException("Cannot add matrices with different block dimensions")
throw new SparkException("Cannot perform on matrices with different block dimensions")
}
}

/**
* Adds the given block matrix `other` to `this` block matrix: `this + other`.
* The matrices must have the same size and matching `rowsPerBlock` and `colsPerBlock`
* values. If one of the blocks that are being added are instances of [[SparseMatrix]],
* the resulting sub matrix will also be a [[SparseMatrix]], even if it is being added
* to a [[DenseMatrix]]. If two dense matrices are added, the output will also be a
* [[DenseMatrix]].
*/
@Since("1.3.0")
def add(other: BlockMatrix): BlockMatrix =
blockMap(other, (x: BM[Double], y: BM[Double]) => x + y)

/**
* Subtracts the given block matrix `other` from `this` block matrix: `this - other`.
* The matrices must have the same size and matching `rowsPerBlock` and `colsPerBlock`
* values. If one of the blocks that are being subtracted are instances of [[SparseMatrix]],
* the resulting sub matrix will also be a [[SparseMatrix]], even if it is being subtracted
* from a [[DenseMatrix]]. If two dense matrices are subtracted, the output will also be a
* [[DenseMatrix]].
*/
@Since("2.0.0")
def subtract(other: BlockMatrix): BlockMatrix =
blockMap(other, (x: BM[Double], y: BM[Double]) => x - y)

/** Block (i,j) --> Set of destination partitions */
private type BlockDestinations = Map[(Int, Int), Set[Int]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,49 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(sparseBM.add(sparseBM).toBreeze() === sparseBM.add(denseBM).toBreeze())
}

test("subtract") {
val blocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))),
((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))),
((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))),
((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))),
((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), // Added block that doesn't exist in A
((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0))))
val rdd = sc.parallelize(blocks, numPartitions)
val B = new BlockMatrix(rdd, rowPerPart, colPerPart)

val expected = BDM(
(0.0, 0.0, 0.0, 0.0),
(0.0, 0.0, 0.0, 0.0),
(0.0, 0.0, 0.0, 0.0),
(0.0, 0.0, 0.0, 0.0),
(-1.0, 0.0, 0.0, 0.0))

val AsubtractB = gridBasedMat.subtract(B)
assert(AsubtractB.numRows() === m)
assert(AsubtractB.numCols() === B.numCols())
assert(AsubtractB.toBreeze() === expected)

val C = new BlockMatrix(rdd, rowPerPart, colPerPart, m, n + 1) // columns don't match
intercept[IllegalArgumentException] {
gridBasedMat.subtract(C)
}
val largerBlocks: Seq[((Int, Int), Matrix)] = Seq(
((0, 0), new DenseMatrix(4, 4, new Array[Double](16))),
((1, 0), new DenseMatrix(1, 4, Array(1.0, 0.0, 1.0, 5.0))))
val C2 = new BlockMatrix(sc.parallelize(largerBlocks, numPartitions), 4, 4, m, n)
intercept[SparkException] { // partitioning doesn't match
gridBasedMat.subtract(C2)
}
// subtracting BlockMatrices composed of SparseMatrices
val sparseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), SparseMatrix.speye(4))
val denseBlocks = for (i <- 0 until 4) yield ((i / 2, i % 2), DenseMatrix.eye(4))
val sparseBM = new BlockMatrix(sc.makeRDD(sparseBlocks, 4), 4, 4, 8, 8)
val denseBM = new BlockMatrix(sc.makeRDD(denseBlocks, 4), 4, 4, 8, 8)

assert(sparseBM.subtract(sparseBM).toBreeze() === sparseBM.subtract(denseBM).toBreeze())
}

test("multiply") {
// identity matrix
val blocks: Seq[((Int, Int), Matrix)] = Seq(
Expand Down