Skip to content

Commit

Permalink
Merge pull request #1 from mengxr/SPARK-4409
Browse files Browse the repository at this point in the history
Some updates for linear algebra utilities
  • Loading branch information
brkyvz committed Dec 24, 2014
2 parents 10a63a6 + 80cfa29 commit 04c4829
Showing 1 changed file with 72 additions and 70 deletions.
142 changes: 72 additions & 70 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,47 +361,47 @@ object SparseMatrix {
* @param entries Array of (i, j, value) tuples
* @return The corresponding `SparseMatrix`
*/
def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = {
val sortedEntries = entries.sortBy(v => (v._2, v._1))
val colPtrs = new Array[Int](numCols + 1)
var nnz = 0
var lastCol = -1
var lastIndex = -1
sortedEntries.foreach { case (i, j, v) =>
require(i >= 0 && j >= 0, "Negative indices given. Please make sure all indices are " +
s"greater than or equal to zero. i: $i, j: $j, value: $v")
if (v != 0.0) {
while (j != lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
val index = j * numRows + i
if (lastIndex != index) {
nnz += 1
lastIndex = index
}
def fromCOO(numRows: Int, numCols: Int, entries: Iterable[(Int, Int, Double)]): SparseMatrix = {
val sortedEntries = entries.toSeq.sortBy(v => (v._2, v._1))
val numEntries = sortedEntries.size
if (sortedEntries.nonEmpty) {
// Since the entries are sorted by column index, we only need to check the first and the last.
for (col <- Seq(sortedEntries.head._2, sortedEntries.last._2)) {
require(col >= 0 && col < numCols, s"Column index out of range [0, $numCols): $col.")
}
}
while (numCols > lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
val values = new Array[Double](nnz)
val rowIndices = new Array[Int](nnz)
lastIndex = -1
var cnt = -1
sortedEntries.foreach { case (i, j, v) =>
if (v != 0.0) {
val index = j * numRows + i
if (lastIndex != index) {
cnt += 1
lastIndex = index
val colPtrs = new Array[Int](numCols + 1)
val rowIndices = MArrayBuilder.make[Int]
rowIndices.sizeHint(numEntries)
val values = MArrayBuilder.make[Double]
values.sizeHint(numEntries)
var nnz = 0
var prevCol = 0
var prevRow = -1
var prevVal = 0.0
// Append a dummy entry to include the last one at the end of the loop.
(sortedEntries.view :+ (numRows, numCols, 1.0)).foreach { case (i, j, v) =>
if (v != 0) {
if (i == prevRow && j == prevCol) {
prevVal += v
} else {
if (prevVal != 0) {
require(prevRow >= 0 && prevRow < numRows,
s"Row index out of range [0, $numRows): $prevRow.")
nnz += 1
rowIndices += prevRow
values += prevVal
}
prevRow = i
prevVal = v
while (prevCol < j) {
colPtrs(prevCol + 1) = nnz
prevCol += 1
}
}
values(cnt) += v
rowIndices(cnt) = i
}
}
new SparseMatrix(numRows, numCols, colPtrs.toArray, rowIndices, values)
new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), values.result())
}

/**
Expand All @@ -413,54 +413,59 @@ object SparseMatrix {
new SparseMatrix(n, n, (0 to n).toArray, (0 until n).toArray, Array.fill(n)(1.0))
}

/** Generates the skeleton of a random `SparseMatrix` with a given random number generator. */
/**
* Generates the skeleton of a random `SparseMatrix` with a given random number generator.
* The values of the matrix returned are undefined.
*/
private def genRandMatrix(
numRows: Int,
numCols: Int,
density: Double,
rng: Random): SparseMatrix = {
require(density >= 0.0 && density <= 1.0, "density must be a double in the range " +
s"0.0 <= d <= 1.0. Currently, density: $density")
val length = math.ceil(numRows * numCols * density).toInt
var i = 0
require(numRows > 0, s"numRows must be greater than 0 but got $numRows")
require(numCols > 0, s"numCols must be greater than 0 but got $numCols")
require(density >= 0.0 && density <= 1.0,
s"density must be a double in the range 0.0 <= d <= 1.0. Currently, density: $density")
val size = numRows.toLong * numCols
val expected = size * density
assert(expected < Int.MaxValue,
"The expected number of nonzeros cannot be greater than Int.MaxValue.")
val nnz = math.ceil(expected).toInt
if (density == 0.0) {
return new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1),
Array[Int](), Array[Double]())
new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array[Int](), Array[Double]())
} else if (density == 1.0) {
val rowIndices = Array.tabulate(numCols, numRows)((j, i) => i).flatten
return new SparseMatrix(numRows, numCols, (0 to numRows * numCols by numRows).toArray,
rowIndices, new Array[Double](numRows * numCols))
}
if (density < 0.34) { // Expected number of iterations is less than 1.5 * length
val colPtrs = Array.tabulate(numCols + 1)(j => j * numRows)
val rowIndices = Array.tabulate(size.toInt)(idx => idx % numRows)
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](numRows * numCols))
} else if (density < 0.34) {
// draw-by-draw, expected number of iterations is less than 1.5 * nnz
val entries = MHashSet[(Int, Int)]()
while (entries.size < length) {
while (entries.size < nnz) {
entries += ((rng.nextInt(numRows), rng.nextInt(numCols)))
}
val entryList = entries.toArray.map(v => (v._1, v._2, 1.0))
SparseMatrix.fromCOO(numRows, numCols, entryList)
} else { // selection - rejection method
SparseMatrix.fromCOO(numRows, numCols, entries.map(v => (v._1, v._2, 1.0)))
} else {
// selection-rejection method
var idx = 0L
var numSelected = 0
var i = 0
var j = 0
val pool = numRows * numCols
val rowIndexBuilder = new MArrayBuilder.ofInt
val colPtrs = new Array[Int](numCols + 1)
while (i < length && j < numCols) {
var passedInPool = j * numRows
var r = 0
while (i < length && r < numRows) {
if (rng.nextDouble() < 1.0 * (length - i) / (pool - passedInPool)) {
rowIndexBuilder += r
i += 1
val rowIndices = new Array[Int](nnz)
while (j < numCols && numSelected < nnz) {
while (i < numRows && numSelected < nnz) {
if (rng.nextDouble() < 1.0 * (nnz - numSelected) / (size - idx)) {
rowIndices(numSelected) = i
numSelected += 1
}
r += 1
passedInPool += 1
i += 1
idx += 1
}
colPtrs(j + 1) = numSelected
j += 1
colPtrs(j) = i
}
val rowIndices = rowIndexBuilder.result()
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](rowIndices.size))
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, new Array[Double](nnz))
}

}

/**
Expand Down Expand Up @@ -735,16 +740,13 @@ object Matrices {
val numCols = matrices(0).numCols
var hasSparse = false
var numRows = 0
var valsLength = 0
matrices.foreach { mat =>
require(numCols == mat.numCols, "The number of rows of the matrices in this sequence, " +
"don't match!")
mat match {
case sparse: SparseMatrix =>
hasSparse = true
valsLength += sparse.values.length
case dense: DenseMatrix =>
valsLength += dense.values.length
case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " +
s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}")
}
Expand Down

0 comments on commit 04c4829

Please sign in to comment.