From 0aa519a4b298562e0088b31524ce96f9ed3a4180 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Jan 2015 14:58:27 -0800 Subject: [PATCH 1/4] [SPARK-5486] Added validate method to BlockMatrix --- .../linalg/distributed/BlockMatrix.scala | 97 ++++++++++++++++++- .../linalg/distributed/BlockMatrixSuite.scala | 56 +++++++++++ 2 files changed, 152 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 426dbf4805d5f..4efeb7d50f1ed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseMatrix => BDM} import org.apache.spark.{Logging, Partitioner} -import org.apache.spark.mllib.linalg.{SparseMatrix, DenseMatrix, Matrix} +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -172,6 +172,96 @@ class BlockMatrix( assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") } + def validate(): Unit = { + validator match { + case (ValidationError.DIMENSION_MISMATCH, e: AssertionError) => + println(s"$e\nPlease instantiate a new BlockMatrix with the correct dimensions.") + case (ValidationError.DIMENSION_MISMATCH, exc: Exception) => + println(s"There was an error while calculating the dimensions for this matrix.\n$exc") + case (ValidationError.DUPLICATE_INDEX, size: Int) => + println(s"There are $size MatrixBlocks with duplicate indices. Please remove blocks with " + + s"duplicate indices. You may call reduceByKey on the underlying RDD and sum the " + + s"duplicates. You may convert the matrices to Breeze before summing them up.") + case (ValidationError.DUPLICATE_INDEX, duplicates: Map[(Int, Int), Long]) => + println(s"The following indices have more than one Matrix:") + duplicates.foreach(index => println(s"Index: ${index._1}, count: ${index._2}")) + println("Please remove these blocks with duplicate indices. You may call reduceByKey on " + + "the underlying RDD and sum the duplicates. You may convert the matrices to Breeze " + + "before summing them up.") + case (ValidationError.DUPLICATE_INDEX, exc: Exception) => + println(s"There was an error while looking for duplicate indices.\n$exc") + case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, size: Long) => + println(s"There are $size MatrixBlocks with dimensions different than " + + s"rowsPerBlock: $rowsPerBlock, and colsPerBlock: $colsPerBlock. You may use the " + + s"repartition method to fix this issue.") + case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, + mismatches: Array[((Int, Int), (Int, Int))]) => + println(s"The following MatrixBlocks have dimensions different than " + + s"(rowsPerBlock, colsPerBlock): ($rowsPerBlock, $colsPerBlock)") + mismatches.foreach(index => println(s"Index: ${index._1}, dimensions: ${index._2}")) + println("You may use the repartition method to fix this issue.") + case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, exc: Exception) => + println(s"There was an error while looking for MatrixBlock dimension mismatches.\n$exc") + case (ValidationError.NO_ERROR, _) => println("There are no problems with this BlockMatrix!") + } + } + + private[mllib] def validator: (ValidationError.Value, Any) = { + logDebug("Validating BlockMatrix...") + // check if the matrix is larger than the claimed dimensions + try { + estimateDim() + logDebug("BlockMatrix dimensions are okay...") + } catch { + case exc: AssertionError => return (ValidationError.DIMENSION_MISMATCH, exc) + case e: Exception => + logError(s"${e.getMessage}\n${e.getStackTraceString}") + return (ValidationError.DIMENSION_MISMATCH, e) + } + try { + // Check if there are multiple MatrixBlocks with the same index. + val indexCounts = blocks.countByKey().filter(p => p._2 > 1) + if (indexCounts.size > 50) { + return (ValidationError.DUPLICATE_INDEX, indexCounts.size) + } else if (indexCounts.size > 0) { + return (ValidationError.DUPLICATE_INDEX, indexCounts) + } + logDebug("MatrixBlock indices are okay...") + } catch { + case e: Exception => + logError(s"${e.getMessage}\n${e.getStackTraceString}") + return (ValidationError.DUPLICATE_INDEX, e) + } + try { + // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock + // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock + val blockDimensionMismatches = blocks.filter { case ((blockRowIndex, blockColIndex), block) => + if ((blockRowIndex == numRowBlocks - 1) || (blockColIndex == numColBlocks - 1)) { + false // neglect edge blocks + } else { + // include it if the dimensions don't match + !(block.numRows == rowsPerBlock && block.numCols == colsPerBlock) + } + }.map { case ((blockRowIndex, blockColIndex), mat) => + ((blockRowIndex, blockColIndex), (mat.numRows, mat.numCols)) + } + val dimensionMismatchCount = blockDimensionMismatches.count() + // Don't send whole list if there are more than 50 matrices with the wrong dimensions + if (dimensionMismatchCount > 50) { + return (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, dimensionMismatchCount) + } else if (dimensionMismatchCount > 0) { + return (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, blockDimensionMismatches.collect()) + } + logDebug("MatrixBlock dimensions are okay...") + logDebug("BlockMatrix is valid!") + (ValidationError.NO_ERROR, null) + } catch { + case e: Exception => + logError(s"${e.getMessage}\n${e.getStackTraceString}") + (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, e) + } + } + /** Caches the underlying RDD. */ def cache(): this.type = { blocks.cache() @@ -238,3 +328,8 @@ class BlockMatrix( new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) } } + +private[mllib] object ValidationError extends Enumeration { + type ValidationError = Value + val NO_ERROR, DUPLICATE_INDEX, MATRIX_BLOCK_DIMENSION_MISMATCH, DIMENSION_MISMATCH, OTHER = Value +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 7284d03d243f5..4a9721d83b4e4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -146,4 +146,60 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.toLocalMatrix() === dense) assert(gridBasedMat.toBreeze() === expected) } + + test("validator") { + // No error + val (error0, info0) = gridBasedMat.validator + assert(error0 === ValidationError.NO_ERROR) + + // Wrong MatrixBlock dimensions + val blocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + ((0, 1), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 0), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + val rdd = sc.parallelize(blocks, numPartitions) + val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart) + val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1) + val (error, information) = wrongRowPerParts.validator + assert(error === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH) + assert(information.isInstanceOf[Array[((Int, Int),(Int, Int))]]) + val (error2, information2) = wrongColPerParts.validator + assert(error2 === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH) + assert(information2.isInstanceOf[Array[((Int, Int),(Int, Int))]]) + // Large number of mismatching MatrixBlock dimensions + val manyBlocks = for (i <- 0 until 60) yield ((i, 0), DenseMatrix.eye(1)) + val manyWrongDims = new BlockMatrix(sc.parallelize(manyBlocks, numPartitions), 2, 2, 140, 4) + val (error3, information3) = manyWrongDims.validator + assert(error3 === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH) + assert(information3.isInstanceOf[Long]) + + // Wrong BlockMatrix dimensions + val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4) + val (error4, information4) = wrongRowSize.validator + assert(error4 === ValidationError.DIMENSION_MISMATCH) + assert(information4.isInstanceOf[AssertionError]) + val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2) + val (error5, information5) = wrongColSize.validator + assert(error5 === ValidationError.DIMENSION_MISMATCH) + assert(information5.isInstanceOf[AssertionError]) + + // Duplicate indices + val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), + ((0, 0), new DenseMatrix(2, 2, Array(0.0, 1.0, 0.0, 0.0))), + ((1, 1), new DenseMatrix(2, 2, Array(3.0, 0.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) + val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2) + val (error6, information6) = dupMatrix.validator + assert(error6 === ValidationError.DUPLICATE_INDEX) + assert(information6.isInstanceOf[Map[(Int, Int), Long]]) + val duplicateBlocks2 = for (i <- 0 until 110) yield ((i / 2, i / 2), DenseMatrix.eye(1)) + val largeDupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks2, numPartitions), 1, 1) + val (error7, information7) = largeDupMatrix.validator + assert(error7 === ValidationError.DUPLICATE_INDEX) + assert(information7.isInstanceOf[Int]) + } } From 25f083b9efb632af6c335b92d2a0d46d0eb5df59 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 29 Jan 2015 19:06:32 -0800 Subject: [PATCH 2/4] simplify implementation --- .../linalg/distributed/BlockMatrix.scala | 129 +++++++----------- .../linalg/distributed/BlockMatrixSuite.scala | 49 +++---- 2 files changed, 71 insertions(+), 107 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 4efeb7d50f1ed..6132d702d2bea 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import breeze.linalg.{DenseMatrix => BDM} -import org.apache.spark.{Logging, Partitioner} +import org.apache.spark.{SparkException, Logging, Partitioner} import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -172,94 +172,62 @@ class BlockMatrix( assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") } - def validate(): Unit = { - validator match { - case (ValidationError.DIMENSION_MISMATCH, e: AssertionError) => - println(s"$e\nPlease instantiate a new BlockMatrix with the correct dimensions.") - case (ValidationError.DIMENSION_MISMATCH, exc: Exception) => - println(s"There was an error while calculating the dimensions for this matrix.\n$exc") - case (ValidationError.DUPLICATE_INDEX, size: Int) => - println(s"There are $size MatrixBlocks with duplicate indices. Please remove blocks with " + - s"duplicate indices. You may call reduceByKey on the underlying RDD and sum the " + - s"duplicates. You may convert the matrices to Breeze before summing them up.") - case (ValidationError.DUPLICATE_INDEX, duplicates: Map[(Int, Int), Long]) => - println(s"The following indices have more than one Matrix:") - duplicates.foreach(index => println(s"Index: ${index._1}, count: ${index._2}")) - println("Please remove these blocks with duplicate indices. You may call reduceByKey on " + - "the underlying RDD and sum the duplicates. You may convert the matrices to Breeze " + - "before summing them up.") - case (ValidationError.DUPLICATE_INDEX, exc: Exception) => - println(s"There was an error while looking for duplicate indices.\n$exc") - case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, size: Long) => - println(s"There are $size MatrixBlocks with dimensions different than " + - s"rowsPerBlock: $rowsPerBlock, and colsPerBlock: $colsPerBlock. You may use the " + - s"repartition method to fix this issue.") - case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, - mismatches: Array[((Int, Int), (Int, Int))]) => - println(s"The following MatrixBlocks have dimensions different than " + - s"(rowsPerBlock, colsPerBlock): ($rowsPerBlock, $colsPerBlock)") - mismatches.foreach(index => println(s"Index: ${index._1}, dimensions: ${index._2}")) - println("You may use the repartition method to fix this issue.") - case (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, exc: Exception) => - println(s"There was an error while looking for MatrixBlock dimension mismatches.\n$exc") - case (ValidationError.NO_ERROR, _) => println("There are no problems with this BlockMatrix!") - } - } - - private[mllib] def validator: (ValidationError.Value, Any) = { + def validate: Unit = { logDebug("Validating BlockMatrix...") // check if the matrix is larger than the claimed dimensions try { estimateDim() logDebug("BlockMatrix dimensions are okay...") } catch { - case exc: AssertionError => return (ValidationError.DIMENSION_MISMATCH, exc) + case exc: AssertionError => throw new SparkException(s"$exc\nPlease instantiate a " + + s"new BlockMatrix with the correct dimensions.") case e: Exception => - logError(s"${e.getMessage}\n${e.getStackTraceString}") - return (ValidationError.DIMENSION_MISMATCH, e) + throw new SparkException(s"${e.getMessage}\n${e.getStackTraceString}") } - try { - // Check if there are multiple MatrixBlocks with the same index. - val indexCounts = blocks.countByKey().filter(p => p._2 > 1) - if (indexCounts.size > 50) { - return (ValidationError.DUPLICATE_INDEX, indexCounts.size) - } else if (indexCounts.size > 0) { - return (ValidationError.DUPLICATE_INDEX, indexCounts) - } - logDebug("MatrixBlock indices are okay...") - } catch { - case e: Exception => - logError(s"${e.getMessage}\n${e.getStackTraceString}") - return (ValidationError.DUPLICATE_INDEX, e) + // Check if there are multiple MatrixBlocks with the same index. + val indexCounts = blocks.countByKey().filter(p => p._2 > 1) + if (indexCounts.size > 50) { + throw new SparkException(s"There are ${indexCounts.size} MatrixBlocks with duplicate " + + s"indices. Please remove blocks with duplicate indices. You may call reduceByKey on " + + s"the underlying RDD and sum the duplicates. You may convert the matrices to Breeze " + + s"before summing them up.") + } else if (indexCounts.size > 0) { + var errorMsg = s"The following indices have more than one Matrix:\n" + indexCounts.foreach(index => errorMsg += s"Index: ${index._1}, count: ${index._2}\n") + errorMsg += "Please remove these blocks with duplicate indices. You may call " + + "reduceByKey on the underlying RDD and sum the duplicates. You may convert the " + + "matrices to Breeze before summing them up." + throw new SparkException(errorMsg) } - try { - // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock - // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock - val blockDimensionMismatches = blocks.filter { case ((blockRowIndex, blockColIndex), block) => - if ((blockRowIndex == numRowBlocks - 1) || (blockColIndex == numColBlocks - 1)) { - false // neglect edge blocks - } else { - // include it if the dimensions don't match - !(block.numRows == rowsPerBlock && block.numCols == colsPerBlock) - } - }.map { case ((blockRowIndex, blockColIndex), mat) => - ((blockRowIndex, blockColIndex), (mat.numRows, mat.numCols)) - } - val dimensionMismatchCount = blockDimensionMismatches.count() - // Don't send whole list if there are more than 50 matrices with the wrong dimensions - if (dimensionMismatchCount > 50) { - return (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, dimensionMismatchCount) - } else if (dimensionMismatchCount > 0) { - return (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, blockDimensionMismatches.collect()) + logDebug("MatrixBlock indices are okay...") + // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock + // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock + val blockDimensionMismatches = blocks.filter { case ((blockRowIndex, blockColIndex), block) => + if ((blockRowIndex == numRowBlocks - 1) || (blockColIndex == numColBlocks - 1)) { + false // neglect edge blocks + } else { + // include it if the dimensions don't match + !(block.numRows == rowsPerBlock && block.numCols == colsPerBlock) } - logDebug("MatrixBlock dimensions are okay...") - logDebug("BlockMatrix is valid!") - (ValidationError.NO_ERROR, null) - } catch { - case e: Exception => - logError(s"${e.getMessage}\n${e.getStackTraceString}") - (ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH, e) + }.map { case ((blockRowIndex, blockColIndex), mat) => + ((blockRowIndex, blockColIndex), (mat.numRows, mat.numCols)) } + val dimensionMismatchCount = blockDimensionMismatches.count() + // Don't send whole list if there are more than 50 matrices with the wrong dimensions + if (dimensionMismatchCount > 50) { + throw new SparkException(s"There are $dimensionMismatchCount MatrixBlocks with dimensions " + + s"different than rowsPerBlock: $rowsPerBlock, and colsPerBlock: $colsPerBlock. You may " + + s"use the repartition method to fix this issue.") + } else if (dimensionMismatchCount > 0) { + val mismatches = blockDimensionMismatches.collect() + var errorMsg = s"The following MatrixBlocks have dimensions different than " + + s"(rowsPerBlock, colsPerBlock): ($rowsPerBlock, $colsPerBlock)\n" + mismatches.foreach(index => errorMsg += s"Index: ${index._1}, dimensions: ${index._2}\n") + errorMsg += "You may use the repartition method to fix this issue." + throw new SparkException(errorMsg) + } + logDebug("MatrixBlock dimensions are okay...") + logDebug("BlockMatrix is valid!") } /** Caches the underlying RDD. */ @@ -328,8 +296,3 @@ class BlockMatrix( new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) } } - -private[mllib] object ValidationError extends Enumeration { - type ValidationError = Value - val NO_ERROR, DUPLICATE_INDEX, MATRIX_BLOCK_DIMENSION_MISMATCH, DIMENSION_MISMATCH, OTHER = Value -} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 4a9721d83b4e4..9b2e039210082 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.linalg.distributed +import org.apache.spark.SparkException + import scala.util.Random import breeze.linalg.{DenseMatrix => BDM} @@ -147,10 +149,9 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.toBreeze() === expected) } - test("validator") { + test("validate") { // No error - val (error0, info0) = gridBasedMat.validator - assert(error0 === ValidationError.NO_ERROR) + gridBasedMat.validate // Wrong MatrixBlock dimensions val blocks: Seq[((Int, Int), Matrix)] = Seq( @@ -162,28 +163,28 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { val rdd = sc.parallelize(blocks, numPartitions) val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart) val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1) - val (error, information) = wrongRowPerParts.validator - assert(error === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH) - assert(information.isInstanceOf[Array[((Int, Int),(Int, Int))]]) - val (error2, information2) = wrongColPerParts.validator - assert(error2 === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH) - assert(information2.isInstanceOf[Array[((Int, Int),(Int, Int))]]) + intercept[SparkException] { + wrongRowPerParts.validate + } + intercept[SparkException] { + wrongColPerParts.validate + } // Large number of mismatching MatrixBlock dimensions val manyBlocks = for (i <- 0 until 60) yield ((i, 0), DenseMatrix.eye(1)) val manyWrongDims = new BlockMatrix(sc.parallelize(manyBlocks, numPartitions), 2, 2, 140, 4) - val (error3, information3) = manyWrongDims.validator - assert(error3 === ValidationError.MATRIX_BLOCK_DIMENSION_MISMATCH) - assert(information3.isInstanceOf[Long]) + intercept[SparkException] { + manyWrongDims.validate + } // Wrong BlockMatrix dimensions val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4) - val (error4, information4) = wrongRowSize.validator - assert(error4 === ValidationError.DIMENSION_MISMATCH) - assert(information4.isInstanceOf[AssertionError]) + intercept[SparkException] { + wrongRowSize.validate + } val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2) - val (error5, information5) = wrongColSize.validator - assert(error5 === ValidationError.DIMENSION_MISMATCH) - assert(information5.isInstanceOf[AssertionError]) + intercept[SparkException] { + wrongColSize.validate + } // Duplicate indices val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq( @@ -193,13 +194,13 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2) - val (error6, information6) = dupMatrix.validator - assert(error6 === ValidationError.DUPLICATE_INDEX) - assert(information6.isInstanceOf[Map[(Int, Int), Long]]) + intercept[SparkException] { + dupMatrix.validate + } val duplicateBlocks2 = for (i <- 0 until 110) yield ((i / 2, i / 2), DenseMatrix.eye(1)) val largeDupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks2, numPartitions), 1, 1) - val (error7, information7) = largeDupMatrix.validator - assert(error7 === ValidationError.DUPLICATE_INDEX) - assert(information7.isInstanceOf[Int]) + intercept[SparkException] { + largeDupMatrix.validate + } } } From b55ac5c12a1a7cfcfa9933073d7ec9bc1cca7a10 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 30 Jan 2015 00:15:11 -0800 Subject: [PATCH 3/4] addressed code review v1 --- .../linalg/distributed/BlockMatrix.scala | 70 ++++++------------- .../linalg/distributed/BlockMatrixSuite.scala | 29 ++------ 2 files changed, 29 insertions(+), 70 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 6132d702d2bea..ca5cd24e10cc6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -158,11 +158,13 @@ class BlockMatrix( private[mllib] var partitioner: GridPartitioner = GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size) + private lazy val blockInfo = blocks.mapValues(block => (block.numRows, block.numCols)).cache() + /** Estimates the dimensions of the matrix. */ private def estimateDim(): Unit = { - val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) => - (blockRowIndex.toLong * rowsPerBlock + mat.numRows, - blockColIndex.toLong * colsPerBlock + mat.numCols) + val (rows, cols) = blockInfo.map { case ((blockRowIndex, blockColIndex), (m, n)) => + (blockRowIndex.toLong * rowsPerBlock + m, + blockColIndex.toLong * colsPerBlock + n) }.reduce { (x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)) } @@ -172,59 +174,31 @@ class BlockMatrix( assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.") } - def validate: Unit = { + def validate(): Unit = { logDebug("Validating BlockMatrix...") // check if the matrix is larger than the claimed dimensions - try { - estimateDim() - logDebug("BlockMatrix dimensions are okay...") - } catch { - case exc: AssertionError => throw new SparkException(s"$exc\nPlease instantiate a " + - s"new BlockMatrix with the correct dimensions.") - case e: Exception => - throw new SparkException(s"${e.getMessage}\n${e.getStackTraceString}") - } + estimateDim() + logDebug("BlockMatrix dimensions are okay...") + // Check if there are multiple MatrixBlocks with the same index. - val indexCounts = blocks.countByKey().filter(p => p._2 > 1) - if (indexCounts.size > 50) { - throw new SparkException(s"There are ${indexCounts.size} MatrixBlocks with duplicate " + - s"indices. Please remove blocks with duplicate indices. You may call reduceByKey on " + - s"the underlying RDD and sum the duplicates. You may convert the matrices to Breeze " + - s"before summing them up.") - } else if (indexCounts.size > 0) { - var errorMsg = s"The following indices have more than one Matrix:\n" - indexCounts.foreach(index => errorMsg += s"Index: ${index._1}, count: ${index._2}\n") - errorMsg += "Please remove these blocks with duplicate indices. You may call " + - "reduceByKey on the underlying RDD and sum the duplicates. You may convert the " + - "matrices to Breeze before summing them up." - throw new SparkException(errorMsg) + val indexCounts = blockInfo.countByKey().foreach { case (key, cnt) => + if (cnt > 1) { + throw new SparkException(s"There are MatrixBlocks with duplicate indices. Please remove " + + s"blocks with duplicate indices. You may call reduceByKey on the underlying RDD and " + + s"sum the duplicates. You may convert the matrices to Breeze before summing them up.") + } } logDebug("MatrixBlock indices are okay...") // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock - val blockDimensionMismatches = blocks.filter { case ((blockRowIndex, blockColIndex), block) => - if ((blockRowIndex == numRowBlocks - 1) || (blockColIndex == numColBlocks - 1)) { - false // neglect edge blocks - } else { - // include it if the dimensions don't match - !(block.numRows == rowsPerBlock && block.numCols == colsPerBlock) + blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) => + if (m != rowsPerBlock || n != colsPerBlock) { + if (blockRowIndex != numRowBlocks - 1 || blockColIndex != numColBlocks - 1) { + throw new SparkException(s"There are MatrixBlocks with dimensions different than " + + s"rowsPerBlock: $rowsPerBlock, and colsPerBlock: $colsPerBlock. You may " + + s"use the repartition method to fix this issue.") + } } - }.map { case ((blockRowIndex, blockColIndex), mat) => - ((blockRowIndex, blockColIndex), (mat.numRows, mat.numCols)) - } - val dimensionMismatchCount = blockDimensionMismatches.count() - // Don't send whole list if there are more than 50 matrices with the wrong dimensions - if (dimensionMismatchCount > 50) { - throw new SparkException(s"There are $dimensionMismatchCount MatrixBlocks with dimensions " + - s"different than rowsPerBlock: $rowsPerBlock, and colsPerBlock: $colsPerBlock. You may " + - s"use the repartition method to fix this issue.") - } else if (dimensionMismatchCount > 0) { - val mismatches = blockDimensionMismatches.collect() - var errorMsg = s"The following MatrixBlocks have dimensions different than " + - s"(rowsPerBlock, colsPerBlock): ($rowsPerBlock, $colsPerBlock)\n" - mismatches.foreach(index => errorMsg += s"Index: ${index._1}, dimensions: ${index._2}\n") - errorMsg += "You may use the repartition method to fix this issue." - throw new SparkException(errorMsg) } logDebug("MatrixBlock dimensions are okay...") logDebug("BlockMatrix is valid!") diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 9b2e039210082..2635f5ba2b0cf 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -17,13 +17,12 @@ package org.apache.spark.mllib.linalg.distributed -import org.apache.spark.SparkException - import scala.util.Random import breeze.linalg.{DenseMatrix => BDM} import org.scalatest.FunSuite +import org.apache.spark.SparkException import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix} import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -151,8 +150,7 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { test("validate") { // No error - gridBasedMat.validate - + gridBasedMat.validate() // Wrong MatrixBlock dimensions val blocks: Seq[((Int, Int), Matrix)] = Seq( ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), @@ -164,28 +162,20 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { val wrongRowPerParts = new BlockMatrix(rdd, rowPerPart + 1, colPerPart) val wrongColPerParts = new BlockMatrix(rdd, rowPerPart, colPerPart + 1) intercept[SparkException] { - wrongRowPerParts.validate - } - intercept[SparkException] { - wrongColPerParts.validate + wrongRowPerParts.validate() } - // Large number of mismatching MatrixBlock dimensions - val manyBlocks = for (i <- 0 until 60) yield ((i, 0), DenseMatrix.eye(1)) - val manyWrongDims = new BlockMatrix(sc.parallelize(manyBlocks, numPartitions), 2, 2, 140, 4) intercept[SparkException] { - manyWrongDims.validate + wrongColPerParts.validate() } - // Wrong BlockMatrix dimensions val wrongRowSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 4, 4) intercept[SparkException] { - wrongRowSize.validate + wrongRowSize.validate() } val wrongColSize = new BlockMatrix(rdd, rowPerPart, colPerPart, 5, 2) intercept[SparkException] { - wrongColSize.validate + wrongColSize.validate() } - // Duplicate indices val duplicateBlocks: Seq[((Int, Int), Matrix)] = Seq( ((0, 0), new DenseMatrix(2, 2, Array(1.0, 0.0, 0.0, 2.0))), @@ -195,12 +185,7 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { ((2, 1), new DenseMatrix(1, 2, Array(1.0, 5.0)))) val dupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks, numPartitions), 2, 2) intercept[SparkException] { - dupMatrix.validate - } - val duplicateBlocks2 = for (i <- 0 until 110) yield ((i / 2, i / 2), DenseMatrix.eye(1)) - val largeDupMatrix = new BlockMatrix(sc.parallelize(duplicateBlocks2, numPartitions), 1, 1) - intercept[SparkException] { - largeDupMatrix.validate + dupMatrix.validate() } } } From c152a739d7d7d0e091fcb3676225e2a5fb9356f1 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 30 Jan 2015 10:20:00 -0800 Subject: [PATCH 4/4] addressed code review v2 --- .../linalg/distributed/BlockMatrix.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index e7334564e870f..a6405975ebe2e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -181,23 +181,28 @@ class BlockMatrix( logDebug("BlockMatrix dimensions are okay...") // Check if there are multiple MatrixBlocks with the same index. - val indexCounts = blockInfo.countByKey().foreach { case (key, cnt) => + blockInfo.countByKey().foreach { case (key, cnt) => if (cnt > 1) { - throw new SparkException(s"There are MatrixBlocks with duplicate indices. Please remove " + - s"blocks with duplicate indices. You may call reduceByKey on the underlying RDD and " + - s"sum the duplicates. You may convert the matrices to Breeze before summing them up.") + throw new SparkException(s"Found multiple MatrixBlocks with the indices $key. Please " + + "remove blocks with duplicate indices.") } } logDebug("MatrixBlock indices are okay...") // Check if each MatrixBlock (except edges) has the dimensions rowsPerBlock x colsPerBlock // The first tuple is the index and the second tuple is the dimensions of the MatrixBlock + val dimensionMsg = s"dimensions different than rowsPerBlock: $rowsPerBlock, and " + + s"colsPerBlock: $colsPerBlock. Blocks on the right and bottom edges can have smaller " + + s"dimensions. You may use the repartition method to fix this issue." blockInfo.foreach { case ((blockRowIndex, blockColIndex), (m, n)) => - if (m != rowsPerBlock || n != colsPerBlock) { - if (blockRowIndex != numRowBlocks - 1 || blockColIndex != numColBlocks - 1) { - throw new SparkException(s"There are MatrixBlocks with dimensions different than " + - s"rowsPerBlock: $rowsPerBlock, and colsPerBlock: $colsPerBlock. You may " + - s"use the repartition method to fix this issue.") - } + if ((blockRowIndex < numRowBlocks - 1 && m != rowsPerBlock) || + (blockRowIndex == numRowBlocks - 1 && (m <= 0 || m > rowsPerBlock))) { + throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " + + dimensionMsg) + } + if ((blockColIndex < numColBlocks - 1 && n != colsPerBlock) || + (blockColIndex == numColBlocks - 1 && (n <= 0 || n > colsPerBlock))) { + throw new SparkException(s"The MatrixBlock at ($blockRowIndex, $blockColIndex) has " + + dimensionMsg) } } logDebug("MatrixBlock dimensions are okay...")