Skip to content

Commit

Permalink
[SPARK-6577] [MLlib] [PySpark] SparseMatrix should be supported in Py…
Browse files Browse the repository at this point in the history
…Spark

Supporting of SparseMatrix in PySpark.

Author: MechCoder <manojkumarsivaraj334@gmail.com>

Closes #5355 from MechCoder/spark-6577 and squashes the following commits:

7492190 [MechCoder] More readable code for densifying
ea2c54b [MechCoder] Check bounds for indexing
454ef2c [MechCoder] Made the following changes 1. Used convert_to_array for array conversion. 2. Used F order for toArray 3. Minor improvements in speed.
db76caf [MechCoder] Add support for CSR matrix
29653e7 [MechCoder] Renamed indices to rowIndices and indptr to colPtrs
b6384fe [MechCoder] [SPARK-6577] SparseMatrix should be supported in PySpark
  • Loading branch information
MechCoder authored and mengxr committed Apr 10, 2015
1 parent b5c51c8 commit e236081
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 8 deletions.
110 changes: 104 additions & 6 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,20 +640,24 @@ def toArray(self):
"""
raise NotImplementedError

@staticmethod
def _convert_to_array(array_like, dtype):
"""
Convert Matrix attributes which are array-like or buffer to array.
"""
if isinstance(array_like, basestring):
return np.frombuffer(array_like, dtype=dtype)
return np.asarray(array_like, dtype=dtype)


class DenseMatrix(Matrix):
"""
Column-major dense matrix.
"""
def __init__(self, numRows, numCols, values):
Matrix.__init__(self, numRows, numCols)
if isinstance(values, basestring):
values = np.frombuffer(values, dtype=np.float64)
elif not isinstance(values, np.ndarray):
values = np.array(values, dtype=np.float64)
values = self._convert_to_array(values, np.float64)
assert len(values) == numRows * numCols
if values.dtype != np.float64:
values.astype(np.float64)
self.values = values

def __reduce__(self):
Expand All @@ -670,6 +674,17 @@ def toArray(self):
"""
return self.values.reshape((self.numRows, self.numCols), order='F')

def toSparse(self):
"""Convert to SparseMatrix"""
indices = np.nonzero(self.values)[0]
colCounts = np.bincount(indices / self.numRows)
colPtrs = np.cumsum(np.hstack(
(0, colCounts, np.zeros(self.numCols - colCounts.size))))
values = self.values[indices]
rowIndices = indices % self.numRows

return SparseMatrix(self.numRows, self.numCols, colPtrs, rowIndices, values)

def __getitem__(self, indices):
i, j = indices
if i < 0 or i >= self.numRows:
Expand All @@ -687,6 +702,82 @@ def __eq__(self, other):
all(self.values == other.values))


class SparseMatrix(Matrix):
"""Sparse Matrix stored in CSC format."""
def __init__(self, numRows, numCols, colPtrs, rowIndices, values,
isTransposed=False):
Matrix.__init__(self, numRows, numCols)
self.isTransposed = isTransposed
self.colPtrs = self._convert_to_array(colPtrs, np.int32)
self.rowIndices = self._convert_to_array(rowIndices, np.int32)
self.values = self._convert_to_array(values, np.float64)

if self.isTransposed:
if self.colPtrs.size != numRows + 1:
raise ValueError("Expected colPtrs of size %d, got %d."
% (numRows + 1, self.colPtrs.size))
else:
if self.colPtrs.size != numCols + 1:
raise ValueError("Expected colPtrs of size %d, got %d."
% (numCols + 1, self.colPtrs.size))
if self.rowIndices.size != self.values.size:
raise ValueError("Expected rowIndices of length %d, got %d."
% (self.rowIndices.size, self.values.size))

def __reduce__(self):
return SparseMatrix, (
self.numRows, self.numCols, self.colPtrs.tostring(),
self.rowIndices.tostring(), self.values.tostring(),
self.isTransposed)

def __getitem__(self, indices):
i, j = indices
if i < 0 or i >= self.numRows:
raise ValueError("Row index %d is out of range [0, %d)"
% (i, self.numRows))
if j < 0 or j >= self.numCols:
raise ValueError("Column index %d is out of range [0, %d)"
% (j, self.numCols))

# If a CSR matrix is given, then the row index should be searched
# for in ColPtrs, and the column index should be searched for in the
# corresponding slice obtained from rowIndices.
if self.isTransposed:
j, i = i, j

colStart = self.colPtrs[j]
colEnd = self.colPtrs[j + 1]
nz = self.rowIndices[colStart: colEnd]
ind = np.searchsorted(nz, i) + colStart
if ind < colEnd and self.rowIndices[ind] == i:
return self.values[ind]
else:
return 0.0

def toArray(self):
"""
Return an numpy.ndarray
"""
A = np.zeros((self.numRows, self.numCols), dtype=np.float64, order='F')
for k in xrange(self.colPtrs.size - 1):
startptr = self.colPtrs[k]
endptr = self.colPtrs[k + 1]
if self.isTransposed:
A[k, self.rowIndices[startptr:endptr]] = self.values[startptr:endptr]
else:
A[self.rowIndices[startptr:endptr], k] = self.values[startptr:endptr]
return A

def toDense(self):
densevals = np.reshape(
self.toArray(), (self.numRows * self.numCols), order='F')
return DenseMatrix(self.numRows, self.numCols, densevals)

# TODO: More efficient implementation:
def __eq__(self, other):
return np.all(self.toArray == other.toArray)


class Matrices(object):
@staticmethod
def dense(numRows, numCols, values):
Expand All @@ -695,6 +786,13 @@ def dense(numRows, numCols, values):
"""
return DenseMatrix(numRows, numCols, values)

@staticmethod
def sparse(numRows, numCols, colPtrs, rowIndices, values):
"""
Create a SparseMatrix
"""
return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)


def _test():
import doctest
Expand Down
52 changes: 50 additions & 2 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import tempfile
import array as pyarray

from numpy import array, array_equal
from numpy import array, array_equal, zeros
from py4j.protocol import Py4JJavaError

if sys.version_info[:2] <= (2, 6):
Expand All @@ -38,7 +38,7 @@

from pyspark.mllib.common import _to_java_object_rdd
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, Vectors, Matrices
DenseMatrix, SparseMatrix, Vectors, Matrices
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
Expand Down Expand Up @@ -144,6 +144,54 @@ def test_matrix_indexing(self):
for j in range(2):
self.assertEquals(mat[i, j], expected[i][j])

def test_sparse_matrix(self):
# Test sparse matrix creation.
sm1 = SparseMatrix(
3, 4, [0, 2, 2, 4, 4], [1, 2, 1, 2], [1.0, 2.0, 4.0, 5.0])
self.assertEquals(sm1.numRows, 3)
self.assertEquals(sm1.numCols, 4)
self.assertEquals(sm1.colPtrs.tolist(), [0, 2, 2, 4, 4])
self.assertEquals(sm1.rowIndices.tolist(), [1, 2, 1, 2])
self.assertEquals(sm1.values.tolist(), [1.0, 2.0, 4.0, 5.0])

# Test indexing
expected = [
[0, 0, 0, 0],
[1, 0, 4, 0],
[2, 0, 5, 0]]

for i in range(3):
for j in range(4):
self.assertEquals(expected[i][j], sm1[i, j])
self.assertTrue(array_equal(sm1.toArray(), expected))

# Test conversion to dense and sparse.
smnew = sm1.toDense().toSparse()
self.assertEquals(sm1.numRows, smnew.numRows)
self.assertEquals(sm1.numCols, smnew.numCols)
self.assertTrue(array_equal(sm1.colPtrs, smnew.colPtrs))
self.assertTrue(array_equal(sm1.rowIndices, smnew.rowIndices))
self.assertTrue(array_equal(sm1.values, smnew.values))

sm1t = SparseMatrix(
3, 4, [0, 2, 3, 5], [0, 1, 2, 0, 2], [3.0, 2.0, 4.0, 9.0, 8.0],
isTransposed=True)
self.assertEquals(sm1t.numRows, 3)
self.assertEquals(sm1t.numCols, 4)
self.assertEquals(sm1t.colPtrs.tolist(), [0, 2, 3, 5])
self.assertEquals(sm1t.rowIndices.tolist(), [0, 1, 2, 0, 2])
self.assertEquals(sm1t.values.tolist(), [3.0, 2.0, 4.0, 9.0, 8.0])

expected = [
[3, 2, 0, 0],
[0, 0, 4, 0],
[9, 0, 8, 0]]

for i in range(3):
for j in range(4):
self.assertEquals(expected[i][j], sm1t[i, j])
self.assertTrue(array_equal(sm1t.toArray(), expected))


class ListTests(PySparkTestCase):

Expand Down

0 comments on commit e236081

Please sign in to comment.