Skip to content

Commit

Permalink
[SPARK-6263] [MLLIB] Python MLlib API missing items: Utils
Browse files Browse the repository at this point in the history
Implement missing API in pyspark.

MLUtils
* appendBias
* loadVectors

`kFold` is also missing however I am not sure `ClassTag` can be passed or restored through python.

Author: lewuathe <lewuathe@me.com>

Closes #5707 from Lewuathe/SPARK-6263 and squashes the following commits:

16863ea [lewuathe] Merge master
3fc27e7 [lewuathe] Merge branch 'master' into SPARK-6263
6084e9c [lewuathe] Resolv conflict
d2aa2a0 [lewuathe] Resolv conflict
9c329d8 [lewuathe] Fix efficiency
3a12a2d [lewuathe] Merge branch 'master' into SPARK-6263
1d4714b [lewuathe] Fix style
b29e2bc [lewuathe] Remove scipy dependencies
e32eb40 [lewuathe] Merge branch 'master' into SPARK-6263
25d3c9d [lewuathe] Remove unnecessary imports
7ec04db [lewuathe] Resolv conflict
1502d13 [lewuathe] Resolv conflict
d6bd416 [lewuathe] Check existence of scipy.sparse
5d555b1 [lewuathe] Construct scipy.sparse matrix
c345a44 [lewuathe] Merge branch 'master' into SPARK-6263
b8b5ef7 [lewuathe] Fix unnecessary sort method
d254be7 [lewuathe] Merge branch 'master' into SPARK-6263
62a9c7e [lewuathe] Fix appendBias return type
454c73d [lewuathe] Merge branch 'master' into SPARK-6263
a353354 [lewuathe] Remove unnecessary appendBias implementation
44295c2 [lewuathe] Merge branch 'master' into SPARK-6263
64f72ad [lewuathe] Merge branch 'master' into SPARK-6263
c728046 [lewuathe] Fix style
2980569 [lewuathe] [SPARK-6263] Python MLlib API missing items: Utils
  • Loading branch information
Lewuathe authored and jkbradley committed Jul 1, 2015
1 parent 31b4a3d commit 184de91
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ private[python] class PythonMLLibAPI extends Serializable {
minPartitions: Int): JavaRDD[LabeledPoint] =
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)

/**
* Loads and serializes vectors saved with `RDD#saveAsTextFile`.
* @param jsc Java SparkContext
* @param path file or directory path in any Hadoop-supported file system URI
* @return serialized vectors in a RDD
*/
def loadVectors(jsc: JavaSparkContext, path: String): RDD[Vector] =
MLUtils.loadVectors(jsc.sc, path)

private def trainRegressionModel(
learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
data: JavaRDD[LabeledPoint],
Expand Down
43 changes: 43 additions & 0 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler, ElementwiseProduct
from pyspark.mllib.util import LinearDataGenerator
from pyspark.mllib.util import MLUtils
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
Expand Down Expand Up @@ -1290,6 +1291,48 @@ def func(rdd):
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)


class MLUtilsTests(MLlibTestCase):
def test_append_bias(self):
data = [2.0, 2.0, 2.0]
ret = MLUtils.appendBias(data)
self.assertEqual(ret[3], 1.0)
self.assertEqual(type(ret), DenseVector)

def test_append_bias_with_vector(self):
data = Vectors.dense([2.0, 2.0, 2.0])
ret = MLUtils.appendBias(data)
self.assertEqual(ret[3], 1.0)
self.assertEqual(type(ret), DenseVector)

def test_append_bias_with_sp_vector(self):
data = Vectors.sparse(3, {0: 2.0, 2: 2.0})
expected = Vectors.sparse(4, {0: 2.0, 2: 2.0, 3: 1.0})
# Returned value must be SparseVector
ret = MLUtils.appendBias(data)
self.assertEqual(ret, expected)
self.assertEqual(type(ret), SparseVector)

def test_load_vectors(self):
import shutil
data = [
[1.0, 2.0, 3.0],
[1.0, 2.0, 3.0]
]
temp_dir = tempfile.mkdtemp()
load_vectors_path = os.path.join(temp_dir, "test_load_vectors")
try:
self.sc.parallelize(data).saveAsTextFile(load_vectors_path)
ret_rdd = MLUtils.loadVectors(self.sc, load_vectors_path)
ret = ret_rdd.collect()
self.assertEqual(len(ret), 2)
self.assertEqual(ret[0], DenseVector([1.0, 2.0, 3.0]))
self.assertEqual(ret[1], DenseVector([1.0, 2.0, 3.0]))
except:
self.fail()
finally:
shutil.rmtree(load_vectors_path)


if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
Expand Down
22 changes: 22 additions & 0 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,28 @@ def loadLabeledPoints(sc, path, minPartitions=None):
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)

@staticmethod
def appendBias(data):
"""
Returns a new vector with `1.0` (bias) appended to
the end of the input vector.
"""
vec = _convert_to_vector(data)
if isinstance(vec, SparseVector):
newIndices = np.append(vec.indices, len(vec))
newValues = np.append(vec.values, 1.0)
return SparseVector(len(vec) + 1, newIndices, newValues)
else:
return _convert_to_vector(np.append(vec.toArray(), 1.0))

@staticmethod
def loadVectors(sc, path):
"""
Loads vectors saved using `RDD[Vector].saveAsTextFile`
with the default number of partitions.
"""
return callMLlibFunc("loadVectors", sc, path)


class Saveable(object):
"""
Expand Down

0 comments on commit 184de91

Please sign in to comment.