Skip to content

Commit

Permalink
[SPARK-4409] Third pass of code review
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Dec 19, 2014
1 parent 75239f8 commit 3971c93
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 97 deletions.
236 changes: 141 additions & 95 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg

import java.util.{Arrays, Random}

import scala.collection.mutable.{ArrayBuffer, Map}
import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map}

import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}

Expand Down Expand Up @@ -150,31 +150,35 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])

/** Generate a `SparseMatrix` from the given `DenseMatrix`. */
def toSparse(): SparseMatrix = {
val sparseA: ArrayBuffer[Double] = new ArrayBuffer()
val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1)
val sRows: ArrayBuffer[Int] = new ArrayBuffer()
var i = 0
val spVals: ArrayBuilder[Double] = new ArrayBuilder.ofDouble
val colPtrs: Array[Int] = new Array[Int](numCols + 1)
val rowIndices: ArrayBuilder[Int] = new ArrayBuilder.ofInt
var nnz = 0
var lastCol = -1
values.foreach { v =>
val r = i % numRows
val c = (i - r) / numRows
if (v != 0.0) {
sRows.append(r)
sparseA.append(v)
while (c != lastCol) {
sCols.append(nnz)
lastCol += 1
var j = 0
while (j < numCols) {
var i = 0
val indStart = j * numRows
while (i < numRows) {
val v = values(indStart + i)
if (v != 0.0) {
rowIndices += i
spVals += v
while (j != lastCol) {
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
nnz += 1
}
nnz += 1
i += 1
}
i += 1
j += 1
}
while (numCols > lastCol) {
sCols.append(sparseA.length)
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray)
new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result())
}
}

Expand Down Expand Up @@ -358,35 +362,30 @@ object SparseMatrix {

/**
* Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of
* (row, column, value) tuples. Array must be sorted first by *column* index and then by row
* index.
* (row, column, value) tuples.
* @param numRows number of rows of the matrix
* @param numCols number of columns of the matrix
* @param entries Array of ((row, column), value) tuples
* @param entries Array of (row, column, value) tuples
* @return The corresponding `SparseMatrix`
*/
def fromCOO(numRows: Int, numCols: Int, entries: Array[((Int, Int), Double)]): SparseMatrix = {
val colPtrs = new ArrayBuffer[Int](numCols + 1)
colPtrs.append(0)
def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = {
val sortedEntries = entries.sortBy(v => (v._2, v._1))
val colPtrs = new Array[Int](numCols + 1)
var nnz = 0
var lastCol = 0
val values = entries.map { case ((i, j), v) =>
var lastCol = -1
val values = sortedEntries.map { case (i, j, v) =>
while (j != lastCol) {
colPtrs.append(nnz)
colPtrs(lastCol + 1) = nnz
lastCol += 1
if (lastCol > numCols) {
throw new IndexOutOfBoundsException("Please make sure that the entries array is " +
"sorted by COLUMN index first and then by row index.")
}
}
nnz += 1
v
}
while (numCols > lastCol) {
colPtrs.append(nnz)
colPtrs(lastCol + 1) = nnz
lastCol += 1
}
new SparseMatrix(numRows, numCols, colPtrs.toArray, entries.map(_._1._1), values)
new SparseMatrix(numRows, numCols, colPtrs.toArray, sortedEntries.map(_._1), values)
}

/**
Expand All @@ -411,17 +410,42 @@ object SparseMatrix {
val length = math.ceil(numRows * numCols * density).toInt
val entries = Map[(Int, Int), Double]()
var i = 0
while (i < length) {
var rowIndex = rng.nextInt(numRows)
var colIndex = rng.nextInt(numCols)
while (entries.contains((rowIndex, colIndex))) {
rowIndex = rng.nextInt(numRows)
colIndex = rng.nextInt(numCols)
// Expected number of iterations is less than 1.5 * length
if (density < 0.34) {
while (i < length) {
var rowIndex = rng.nextInt(numRows)
var colIndex = rng.nextInt(numCols)
while (entries.contains((rowIndex, colIndex))) {
rowIndex = rng.nextInt(numRows)
colIndex = rng.nextInt(numCols)
}
entries += (rowIndex, colIndex) -> method(rng)
i += 1
}
} else { // selection - rejection method
var j = 0
val triesPerCol = math.ceil(length * 1.0 / numCols).toInt
val pool = numRows * numCols
// loop over columns so that the sort in fromCOO requires less sorting
while (i < length && j < numCols) {
var k = 0
val leftFromPool = (numCols - j) * numRows
while (k < triesPerCol) {
if (rng.nextDouble() < 1.0 * (length - i) / (pool - leftFromPool)) {
var rowIndex = rng.nextInt(numRows)
val colIndex = j
while (entries.contains((rowIndex, colIndex))) {
rowIndex = rng.nextInt(numRows)
}
entries += (rowIndex, colIndex) -> method(rng)
i += 1
}
k += 1
}
j += 1
}
entries += (rowIndex, colIndex) -> method(rng)
i += 1
}
SparseMatrix.fromCOO(numRows, numCols, entries.toArray.sortBy(v => (v._1._2, v._1._1)))
SparseMatrix.fromCOO(numRows, numCols, entries.toArray.map(v => (v._1._1, v._1._2, v._2)))
}

/**
Expand Down Expand Up @@ -462,12 +486,12 @@ object SparseMatrix {
val n = vector.size
vector match {
case sVec: SparseVector =>
val indices = sVec.indices.map(i => (i, i))
SparseMatrix.fromCOO(n, n, indices.zip(sVec.values))
val indices = sVec.indices
SparseMatrix.fromCOO(n, n, indices.zip(sVec.values).map(v => (v._1, v._1, v._2)))
case dVec: DenseVector =>
val values = dVec.values.zipWithIndex
val nnzVals = values.filter(v => v._1 != 0.0)
SparseMatrix.fromCOO(n, n, nnzVals.map(v => ((v._2, v._2), v._1)))
SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1)))
}
}
}
Expand Down Expand Up @@ -613,58 +637,70 @@ object Matrices {
* @return a single `Matrix` composed of the matrices that were horizontally concatenated
*/
def horzcat(matrices: Array[Matrix]): Matrix = {
if (matrices.size == 1) {
return matrices(0)
} else if (matrices.size == 0) {
if (matrices.isEmpty) {
return new DenseMatrix(0, 0, Array[Double]())
} else if (matrices.size == 1) {
return matrices(0)
}
val numRows = matrices(0).numRows
var rowsMatch = true
var hasDense = false
var hasSparse = false
var numCols = 0
matrices.foreach { mat =>
if (numRows != mat.numRows) rowsMatch = false
mat match {
case sparse: SparseMatrix => hasSparse = true
case dense: DenseMatrix => hasDense = true
case dense: DenseMatrix => // empty on purpose
case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " +
s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}")
}
numCols += mat.numCols
}
require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!")

if (!hasSparse && hasDense) {
if (!hasSparse) {
new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray))
} else {
var startCol = 0
val entries: Array[((Int, Int), Double)] = matrices.flatMap {
val entries: Array[(Int, Int, Double)] = matrices.flatMap {
case spMat: SparseMatrix =>
var j = 0
var cnt = 0
val ptr = spMat.colPtrs
val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) =>
cnt += 1
if (cnt <= ptr(j + 1)) {
((i, j + startCol), v)
} else {
while (ptr(j + 1) < cnt) {
j += 1
}
((i, j + startCol), v)
val colPtrs = spMat.colPtrs
val rowIndices = spMat.rowIndices
val values = spMat.values
val data = new Array[(Int, Int, Double)](values.length)
val nCols = spMat.numCols
while (j < nCols) {
var idx = colPtrs(j)
while (idx < colPtrs(j + 1)) {
val i = rowIndices(idx)
val v = values(idx)
data(idx) = (i, j + startCol, v)
idx += 1
}
j += 1
}
startCol += spMat.numCols
startCol += nCols
data
case dnMat: DenseMatrix =>
val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0)
val data = nnzValues.map { case (v, i) =>
val rowIndex = i % dnMat.numRows
val colIndex = i / dnMat.numRows
((rowIndex, colIndex + startCol), v)
val data = new ArrayBuffer[(Int, Int, Double)]()
var j = 0
val nCols = dnMat.numCols
val nRows = dnMat.numRows
val values = dnMat.values
while (j < nCols) {
var i = 0
val indStart = j * nRows
while (i < nRows) {
val v = values(indStart + i)
if (v != 0.0) {
data.append((i, j + startCol, v))
}
i += 1
}
j += 1
}
startCol += dnMat.numCols
startCol += nCols
data
}
SparseMatrix.fromCOO(numRows, numCols, entries)
Expand All @@ -679,14 +715,13 @@ object Matrices {
* @return a single `Matrix` composed of the matrices that were vertically concatenated
*/
def vertcat(matrices: Array[Matrix]): Matrix = {
if (matrices.size == 1) {
return matrices(0)
} else if (matrices.size == 0) {
if (matrices.isEmpty) {
return new DenseMatrix(0, 0, Array[Double]())
} else if (matrices.size == 1) {
return matrices(0)
}
val numCols = matrices(0).numCols
var colsMatch = true
var hasDense = false
var hasSparse = false
var numRows = 0
var valsLength = 0
Expand All @@ -697,7 +732,6 @@ object Matrices {
hasSparse = true
valsLength += sparse.values.length
case dense: DenseMatrix =>
hasDense = true
valsLength += dense.values.length
case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " +
s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}")
Expand All @@ -707,7 +741,7 @@ object Matrices {
}
require(colsMatch, "The number of rows of the matrices in this sequence, don't match!")

if (!hasSparse && hasDense) {
if (!hasSparse) {
val matData = matrices.zipWithIndex.flatMap { case (mat, ind) =>
val values = mat.toArray
for (j <- 0 until numCols) yield (j, ind,
Expand All @@ -716,34 +750,46 @@ object Matrices {
new DenseMatrix(numRows, numCols, matData.flatMap(_._3))
} else {
var startRow = 0
val entries: Array[((Int, Int), Double)] = matrices.flatMap {
val entries: Array[(Int, Int, Double)] = matrices.flatMap {
case spMat: SparseMatrix =>
var j = 0
var cnt = 0
val ptr = spMat.colPtrs
val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) =>
cnt += 1
if (cnt <= ptr(j + 1)) {
((i + startRow, j), v)
} else {
while (ptr(j + 1) < cnt) {
j += 1
}
((i + startRow, j), v)
val colPtrs = spMat.colPtrs
val rowIndices = spMat.rowIndices
val values = spMat.values
val data = new Array[(Int, Int, Double)](values.length)
while (j < numCols) {
var idx = colPtrs(j)
while (idx < colPtrs(j + 1)) {
val i = rowIndices(idx)
val v = values(idx)
data(idx) = (i + startRow, j, v)
idx += 1
}
j += 1
}
startRow += spMat.numRows
data
case dnMat: DenseMatrix =>
val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0)
val data = nnzValues.map { case (v, i) =>
val rowIndex = i % dnMat.numRows
val colIndex = i / dnMat.numRows
((rowIndex + startRow, colIndex), v)
val data = new ArrayBuffer[(Int, Int, Double)]()
var j = 0
val nCols = dnMat.numCols
val nRows = dnMat.numRows
val values = dnMat.values
while (j < nCols) {
var i = 0
val indStart = j * nRows
while (i < nRows) {
val v = values(indStart + i)
if (v != 0.0) {
data.append((i + startRow, j, v))
}
i += 1
}
j += 1
}
startRow += dnMat.numRows
startRow += nRows
data
}.sortBy(d => (d._1._2, d._1._1))
}
SparseMatrix.fromCOO(numRows, numCols, entries)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,25 @@ public void zerosMatrixConstruction() {
assertArrayEquals(done.toArray(), new double[]{1.0, 1.0, 1.0, 1.0}, 0.0);
}

@Test
public void sparseDenseConversion() {
int m = 3;
int n = 2;
double[] values = new double[]{1.0, 2.0, 4.0, 5.0};
double[] allValues = new double[]{1.0, 2.0, 0.0, 0.0, 4.0, 5.0};
int[] colPtrs = new int[]{0, 2, 4};
int[] rowIndices = new int[]{0, 1, 1, 2};

SparseMatrix spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values);
DenseMatrix deMat1 = new DenseMatrix(m, n, allValues);

SparseMatrix spMat2 = deMat1.toSparse();
DenseMatrix deMat2 = spMat1.toDense();

assertArrayEquals(spMat1.toArray(), spMat2.toArray(), 0.0);
assertArrayEquals(deMat1.toArray(), deMat2.toArray(), 0.0);
}

@Test
public void concatenateMatrices() {
int m = 3;
Expand Down
Loading

0 comments on commit 3971c93

Please sign in to comment.