From 63027ec46e6471ca65196883f8da598b700dd366 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Jan 2015 18:42:31 -0800 Subject: [PATCH 1/3] [SPARK-3976] Added repartitioning for BlockMatrix --- .../linalg/distributed/BlockMatrix.scala | 127 +++++++++++++++++- .../linalg/distributed/BlockMatrixSuite.scala | 57 ++++++++ 2 files changed, 179 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 426dbf4805d5f..10c7a8e139d28 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -17,15 +17,15 @@ package org.apache.spark.mllib.linalg.distributed -import scala.collection.mutable.ArrayBuffer - import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.{Logging, Partitioner} -import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import scala.collection.mutable.ArrayBuffer + /** * A grid partitioner, which uses a regular grid to partition coordinates. * @@ -45,8 +45,8 @@ private[mllib] class GridPartitioner( require(rowsPerPart > 0) require(colsPerPart > 0) - private val rowPartitions = math.ceil(rows / rowsPerPart).toInt - private val colPartitions = math.ceil(cols / colsPerPart).toInt + private val rowPartitions = math.ceil(rows * 1.0 / rowsPerPart).toInt + private val colPartitions = math.ceil(cols * 1.0 / colsPerPart).toInt override val numPartitions = rowPartitions * colPartitions @@ -237,4 +237,121 @@ class BlockMatrix( val localMat = toLocalMatrix() new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) } + + /** + * Forms [[MatrixBlock]]s using `rowsPerBlock` and `colsPerBlock`, which were provided in + * the constructor. Even if all [[MatrixBlock]]s (except the ones on the right and bottom edge) + * have dimensions `rowsPerBlock`x`colsPerBlock`, a new [[BlockMatrix]] will be returned. + */ + def repartition(): BlockMatrix = { + repartition(rowsPerBlock, colsPerBlock) + } + + /** + * Forms [[MatrixBlock]]s using the provided dimensions `newRowsPerBlock`x`newColsPerBlock` + * Even if all [[MatrixBlock]]s (except the ones on the right and bottom edge) + * have dimensions `newRowsPerBlock`x`newColsPerBlock`, a new [[BlockMatrix]] will be returned. + * The resulting number of partitions may be different than the current number of partitions. + * + * @param newRowsPerBlock Number of rows that make up each MatrixBlock. + * @param newColsPerBlock Number of columns that make up each MatrixBlock. + * @return The repartitioned BlockMatrix + */ + def repartition(newRowsPerBlock: Int, newColsPerBlock: Int): BlockMatrix = { + repartition(newRowsPerBlock, newColsPerBlock, blocks.partitions.length) + } + + /** + * Forms [[MatrixBlock]]s using the provided dimensions `newRowsPerBlock`x`newColsPerBlock` + * Even if all [[MatrixBlock]]s (except the ones on the right and bottom edge) + * have dimensions `newRowsPerBlock`x`newColsPerBlock`, a new [[BlockMatrix]] will be returned. + * The resulting number of partitions may be different than `suggestedNumPartitions`. Assumes + * that the offsets of each block is + * (`blockRowIndex`x`rowsPerBlock`, `blockColIndex`x`colsPerBlock`). + * + * If there are any [[MatrixBlock]]s with dimensions greater than `rowsPerBlock`x`colsPerBlock`, + * the data outside this scope will be added on to the data of the neighboring [[MatrixBlock]]s. + * + * @param newRowsPerBlock Number of rows that make up each block. + * @param newColsPerBlock Number of columns that make up each block. + * @param suggestedNumPartitions Number of partitions to partition the underlying RDD in. The + * final number of partitions may not equal this number. + * @return The repartitioned BlockMatrix + */ + def repartition( + newRowsPerBlock: Int, + newColsPerBlock: Int, + suggestedNumPartitions: Int): BlockMatrix = { + require(newRowsPerBlock > 0, + s"newRowsPerBlock must be greater than 0. newRowsPerBlock: $newRowsPerBlock") + require(newColsPerBlock > 0, + s"newColsPerBlock must be greater than 0. newColsPerBlock: $newColsPerBlock") + require(suggestedNumPartitions > 0, s"suggestedNumPartitions must be greater than 0. " + + s"suggestedNumPartitions: $suggestedNumPartitions") + val m = numRows() + val n = numCols() + val newNumRowBlocks = math.ceil(m * 1.0 / newRowsPerBlock).toInt + val newNumColBlocks = math.ceil(n * 1.0 / newColsPerBlock).toInt + val slicedBlocks = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) => + val rowStartOffset = blockRowIndex * rowsPerBlock + val rowEndOffset = rowStartOffset + mat.numRows + val colStartOffset = blockColIndex * colsPerBlock + val colEndOffset = colStartOffset + mat.numCols + + // The range of indices that the parts of this block are going to be mapped to + val rowIndex = rowStartOffset / newRowsPerBlock + val endRowIndex = math.ceil(rowEndOffset * 1.0 / newRowsPerBlock).toInt + val colIndex = colStartOffset / newColsPerBlock + val endColIndex = math.ceil(colEndOffset * 1.0 / newColsPerBlock).toInt + + // The (Int, Int) key correspond to the index in the grid that this block now belongs to + // In (Int, Int, Matrix), the first Int is the row offset that the subBlock will have + // in the new block it's going to be a part of. The second Int is the column offset. + val subBlocks = new ArrayBuffer[((Int, Int), (Int, Int, Matrix))]( + (endRowIndex - rowIndex) * (endColIndex - colIndex)) + + var colIdx = colIndex + while (colIdx < endColIndex) { + var rowIdx = rowIndex + while (rowIdx < endRowIndex) { + // The indices to slice from the matrix + val sliceRowStart = + math.max(rowStartOffset, rowIdx * newRowsPerBlock) - rowStartOffset + val sliceRowEnd = + math.min(rowEndOffset, (rowIdx + 1) * newRowsPerBlock) - rowStartOffset + val sliceColStart = + math.max(colStartOffset, colIdx * newColsPerBlock) - colStartOffset + val sliceColEnd = + math.min(colEndOffset, (colIdx + 1) * newColsPerBlock) - colStartOffset + // slice matrix + val slicedMat = mat.toBreeze(sliceRowStart until sliceRowEnd, + sliceColStart until sliceColEnd).toDenseMatrix + subBlocks.append(((rowIdx, colIdx), ( + sliceRowStart + rowStartOffset - rowIdx * newRowsPerBlock, + sliceColStart + colStartOffset - colIdx * newColsPerBlock, + Matrices.fromBreeze(slicedMat)))) + rowIdx += 1 + } + colIdx += 1 + } + subBlocks + } + val newPartitioner = GridPartitioner(newNumRowBlocks, newNumColBlocks, suggestedNumPartitions) + val newMatrixBlocksRDD: RDD[MatrixBlock] = slicedBlocks.groupByKey(newPartitioner). + map { case ((blockRowIndex, blockColIndex), subBlocks) => + // TODO: When SparseMatrices are supported for operations like multiply, optimize the + // following in terms of storage + val effRowsPerBlock = math.min(m - blockRowIndex * newRowsPerBlock, newRowsPerBlock).toInt + val effColsPerBlock = math.min(n - blockColIndex * newColsPerBlock, newColsPerBlock).toInt + val finalBlock = DenseMatrix.zeros(effRowsPerBlock, effColsPerBlock) + val values = finalBlock.values + subBlocks.foreach { case (rowOffset, colOffset, subBlock) => + subBlock.foreachActive { (i, j, v) => + values((j + colOffset) * effRowsPerBlock + rowOffset + i) += v + } + } + ((blockRowIndex, blockColIndex), finalBlock) + } + new BlockMatrix(newMatrixBlocksRDD, newRowsPerBlock, newColsPerBlock, m, n) + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 7284d03d243f5..b8db1997d2b73 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -146,4 +146,61 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.toLocalMatrix() === dense) assert(gridBasedMat.toBreeze() === expected) } + + test("repartition") { + val brzA = gridBasedMat.toBreeze() + val threeBythree = gridBasedMat.repartition(3, 3) + assert(threeBythree.rowsPerBlock === 3) + assert(threeBythree.colsPerBlock === 3) + assert(threeBythree.numRows() === m) + assert(threeBythree.numCols() === n) + assert(threeBythree.numRowBlocks === 2) + assert(threeBythree.numColBlocks === 2) + assert(threeBythree.toBreeze() === brzA) + + val twoByOne = threeBythree.repartition(2, 1) + assert(twoByOne.rowsPerBlock === 2) + assert(twoByOne.colsPerBlock === 1) + assert(twoByOne.numRows() === m) + assert(twoByOne.numCols() === n) + assert(twoByOne.numRowBlocks === 3) + assert(twoByOne.numColBlocks === 4) + assert(twoByOne.toBreeze() === brzA) + + // if two blocks 'overlap' add them together + val overlappingBlocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 3, Array(1.0, 0.0, 0.0, 2.0, 0.0, 1.0))), + ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 0), new DenseMatrix(2, 3, Array(3.0, 0.0, 1.0, 1.0, 1.0, 2.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + + val expected = BDM( + (1.0, 0.0, 0.0, 0.0), + (0.0, 2.0, 2.0, 0.0), + (3.0, 1.0, 2.0, 0.0), + (0.0, 1.0, 4.0, 1.0), + (0.0, 0.0, 1.0, 5.0)) + + val overlap = new BlockMatrix(sc.parallelize(overlappingBlocks, numPartitions), + rowPerPart, colPerPart).repartition() + + assert(overlap.numRows() === m) + assert(overlap.numCols() === n) + assert(overlap.rowsPerBlock === rowPerPart) + assert(overlap.colsPerBlock === colPerPart) + assert(overlap.numRowBlocks === 3) + assert(overlap.numColBlocks === 2) + assert(overlap.toBreeze() === expected) + + intercept[IllegalArgumentException] { + gridBasedMat.repartition(0, 1) + } + intercept[IllegalArgumentException] { + gridBasedMat.repartition(1, 0) + } + intercept[IllegalArgumentException] { + gridBasedMat.repartition(1, 1, -1) + } + } } From f947723a898ccbb801e21f032c7fe7ba5b0797d7 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Jan 2015 23:59:26 -0800 Subject: [PATCH 2/3] 10 line rewrite of 40 line solution --- .../linalg/distributed/BlockMatrix.scala | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 10c7a8e139d28..c84c23549425e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.{Logging, Partitioner} -import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} +import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrices, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -294,10 +294,11 @@ class BlockMatrix( val newNumColBlocks = math.ceil(n * 1.0 / newColsPerBlock).toInt val slicedBlocks = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) => val rowStartOffset = blockRowIndex * rowsPerBlock - val rowEndOffset = rowStartOffset + mat.numRows + //val rowEndOffset = rowStartOffset + mat.numRows val colStartOffset = blockColIndex * colsPerBlock - val colEndOffset = colStartOffset + mat.numCols + //val colEndOffset = colStartOffset + mat.numCols + /* // The range of indices that the parts of this block are going to be mapped to val rowIndex = rowStartOffset / newRowsPerBlock val endRowIndex = math.ceil(rowEndOffset * 1.0 / newRowsPerBlock).toInt @@ -307,8 +308,10 @@ class BlockMatrix( // The (Int, Int) key correspond to the index in the grid that this block now belongs to // In (Int, Int, Matrix), the first Int is the row offset that the subBlock will have // in the new block it's going to be a part of. The second Int is the column offset. + + val subBlocks = new ArrayBuffer[((Int, Int), (Int, Int, Matrix))]( - (endRowIndex - rowIndex) * (endColIndex - colIndex)) + (endRowIndex - rowIndex) * (endColIndex - colIndex)) var colIdx = colIndex while (colIdx < endColIndex) { @@ -316,16 +319,17 @@ class BlockMatrix( while (rowIdx < endRowIndex) { // The indices to slice from the matrix val sliceRowStart = - math.max(rowStartOffset, rowIdx * newRowsPerBlock) - rowStartOffset + math.max(rowStartOffset, rowIdx * newRowsPerBlock) - rowStartOffset val sliceRowEnd = - math.min(rowEndOffset, (rowIdx + 1) * newRowsPerBlock) - rowStartOffset + math.min(rowEndOffset, (rowIdx + 1) * newRowsPerBlock) - rowStartOffset val sliceColStart = - math.max(colStartOffset, colIdx * newColsPerBlock) - colStartOffset + math.max(colStartOffset, colIdx * newColsPerBlock) - colStartOffset val sliceColEnd = - math.min(colEndOffset, (colIdx + 1) * newColsPerBlock) - colStartOffset + math.min(colEndOffset, (colIdx + 1) * newColsPerBlock) - colStartOffset // slice matrix val slicedMat = mat.toBreeze(sliceRowStart until sliceRowEnd, sliceColStart until sliceColEnd).toDenseMatrix + subBlocks.append(((rowIdx, colIdx), ( sliceRowStart + rowStartOffset - rowIdx * newRowsPerBlock, sliceColStart + colStartOffset - colIdx * newColsPerBlock, @@ -335,20 +339,30 @@ class BlockMatrix( colIdx += 1 } subBlocks + */ + val values = new ArrayBuffer[((Int, Int), (Int, Int, Double))]() + mat.foreachActive { (i, j, v) => + val targetBlockRowIndex = (rowStartOffset + i) / newRowsPerBlock + val targetBlockColIndex = (colStartOffset + j) / newColsPerBlock + val targetRowOffset = (rowStartOffset + i) - newRowsPerBlock * targetBlockRowIndex + val targetColOffset = (colStartOffset + j) - newColsPerBlock * targetBlockColIndex + values.append( + ((targetBlockRowIndex, targetBlockColIndex), (targetRowOffset, targetColOffset, v))) + } + values } val newPartitioner = GridPartitioner(newNumRowBlocks, newNumColBlocks, suggestedNumPartitions) val newMatrixBlocksRDD: RDD[MatrixBlock] = slicedBlocks.groupByKey(newPartitioner). map { case ((blockRowIndex, blockColIndex), subBlocks) => // TODO: When SparseMatrices are supported for operations like multiply, optimize the - // following in terms of storage + // following in terms of storage. Not using fromCOO, because operations mostly support + // DenseMatrix val effRowsPerBlock = math.min(m - blockRowIndex * newRowsPerBlock, newRowsPerBlock).toInt val effColsPerBlock = math.min(n - blockColIndex * newColsPerBlock, newColsPerBlock).toInt val finalBlock = DenseMatrix.zeros(effRowsPerBlock, effColsPerBlock) val values = finalBlock.values - subBlocks.foreach { case (rowOffset, colOffset, subBlock) => - subBlock.foreachActive { (i, j, v) => - values((j + colOffset) * effRowsPerBlock + rowOffset + i) += v - } + subBlocks.foreach { case (rowIndex, colIndex, v) => + values(colIndex * effRowsPerBlock + rowIndex) += v } ((blockRowIndex, blockColIndex), finalBlock) } From 45dda4949716f76dd36f8af8d1adb83ac8adab25 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 30 Jan 2015 00:35:53 -0800 Subject: [PATCH 3/3] fixed scalastyle --- .../apache/spark/mllib/linalg/distributed/BlockMatrix.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index ce9ba53d2222b..2168eb9bdd562 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -303,9 +303,9 @@ class BlockMatrix( val newNumColBlocks = math.ceil(n * 1.0 / newColsPerBlock).toInt val slicedBlocks = blocks.flatMap { case ((blockRowIndex, blockColIndex), mat) => val rowStartOffset = blockRowIndex * rowsPerBlock - //val rowEndOffset = rowStartOffset + mat.numRows + // val rowEndOffset = rowStartOffset + mat.numRows val colStartOffset = blockColIndex * colsPerBlock - //val colEndOffset = colStartOffset + mat.numCols + // val colEndOffset = colStartOffset + mat.numCols /* // The range of indices that the parts of this block are going to be mapped to