Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into decisiontree-pyth…
Browse files Browse the repository at this point in the history
…on-new

Conflicts:
	mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
(not a real conflict; merged by concatenation)
  • Loading branch information
jkbradley committed Aug 1, 2014
2 parents 4801b40 + d88e695 commit e34c263
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import java.nio.{ByteBuffer, ByteOrder}
import scala.collection.JavaConversions._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
Expand All @@ -34,6 +34,8 @@ import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance}
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.correlation.CorrelationNames
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -234,7 +236,7 @@ class PythonMLLibAPI extends Serializable {
jsc: JavaSparkContext,
path: String,
minPartitions: Int): JavaRDD[Array[Byte]] =
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint).toJavaRDD()
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(serializeLabeledPoint)

private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
Expand Down Expand Up @@ -531,6 +533,36 @@ class PythonMLLibAPI extends Serializable {
dataJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
val data = dataJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
model.predict(data).map(serializeDouble)

/**
* Java stub for mllib Statistics.corr(X: RDD[Vector], method: String).
* Returns the correlation matrix serialized into a byte array understood by deserializers in
* pyspark.
*/
def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
val inputMatrix = X.rdd.map(deserializeDoubleVector(_))
val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
serializeDoubleMatrix(to2dArray(result))
}

/**
* Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], method: String).
*/
def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): Double = {
val xDeser = x.rdd.map(deserializeDouble(_))
val yDeser = y.rdd.map(deserializeDouble(_))
Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
}

// used by the corr methods to retrieve the name of the correlation method passed in via pyspark
private def getCorrNameOrDefault(method: String) = {
if (method == null) CorrelationNames.defaultCorrName else method
}

// Reformat a Matrix into Array[Array[Double]] for serialization
private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
val values = matrix.toArray
Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
}

// Used by the *RDD methods to get default seed if not passed in from pyspark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,24 @@ import org.apache.spark.mllib.stat.correlation.Correlations
import org.apache.spark.rdd.RDD

/**
* API for statistical functions in MLlib
* API for statistical functions in MLlib.
*/
@Experimental
object Statistics {

/**
* :: Experimental ::
* Compute the Pearson correlation matrix for the input RDD of Vectors.
* Columns with 0 covariance produce NaN entries in the correlation matrix.
*
* @param X an RDD[Vector] for which the correlation matrix is to be computed.
* @return Pearson correlation matrix comparing columns in X.
*/
@Experimental
def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)

/**
* :: Experimental ::
* Compute the correlation matrix for the input RDD of Vectors using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
Expand All @@ -51,9 +54,11 @@ object Statistics {
* Supported: `pearson` (default), `spearman`
* @return Correlation matrix comparing columns in X.
*/
@Experimental
def corr(X: RDD[Vector], method: String): Matrix = Correlations.corrMatrix(X, method)

/**
* :: Experimental ::
* Compute the Pearson correlation for the input RDDs.
* Returns NaN if either vector has 0 variance.
*
Expand All @@ -64,9 +69,11 @@ object Statistics {
* @param y RDD[Double] of the same cardinality as x.
* @return A Double containing the Pearson correlation between the two input RDD[Double]s
*/
@Experimental
def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y)

/**
* :: Experimental ::
* Compute the correlation for the input RDDs using the specified method.
* Methods currently supported: `pearson` (default), `spearman`.
*
Expand All @@ -80,5 +87,6 @@ object Statistics {
*@return A Double containing the correlation between the two input RDD[Double]s using the
* specified method.
*/
@Experimental
def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,48 @@ private[stat] trait Correlation {
}

/**
* Delegates computation to the specific correlation object based on the input method name
*
* Currently supported correlations: pearson, spearman.
* After new correlation algorithms are added, please update the documentation here and in
* Statistics.scala for the correlation APIs.
*
* Maintains the default correlation type, pearson
* Delegates computation to the specific correlation object based on the input method name.
*/
private[stat] object Correlations {

// Note: after new types of correlations are implemented, please update this map
val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation))
val defaultCorrName: String = "pearson"
val defaultCorr: Correlation = nameToObjectMap(defaultCorrName)

def corr(x: RDD[Double], y: RDD[Double], method: String = defaultCorrName): Double = {
def corr(x: RDD[Double],
y: RDD[Double],
method: String = CorrelationNames.defaultCorrName): Double = {
val correlation = getCorrelationFromName(method)
correlation.computeCorrelation(x, y)
}

def corrMatrix(X: RDD[Vector], method: String = defaultCorrName): Matrix = {
def corrMatrix(X: RDD[Vector],
method: String = CorrelationNames.defaultCorrName): Matrix = {
val correlation = getCorrelationFromName(method)
correlation.computeCorrelationMatrix(X)
}

/**
* Match input correlation name with a known name via simple string matching
*
* private to stat for ease of unit testing
*/
private[stat] def getCorrelationFromName(method: String): Correlation = {
// Match input correlation name with a known name via simple string matching.
def getCorrelationFromName(method: String): Correlation = {
try {
nameToObjectMap(method)
CorrelationNames.nameToObjectMap(method)
} catch {
case nse: NoSuchElementException =>
throw new IllegalArgumentException("Unrecognized method name. Supported correlations: "
+ nameToObjectMap.keys.mkString(", "))
+ CorrelationNames.nameToObjectMap.keys.mkString(", "))
}
}
}

/**
* Maintains supported and default correlation names.
*
* Currently supported correlations: `pearson`, `spearman`.
* Current default correlation: `pearson`.
*
* After new correlation algorithms are added, please update the documentation here and in
* Statistics.scala for the correlation APIs.
*/
private[mllib] object CorrelationNames {

// Note: after new types of correlations are implemented, please update this map.
val nameToObjectMap = Map(("pearson", PearsonCorrelation), ("spearman", SpearmanCorrelation))
val defaultCorrName: String = "pearson"

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.api.python

import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint

class PythonMLLibAPISuite extends FunSuite {
Expand Down Expand Up @@ -59,10 +59,25 @@ class PythonMLLibAPISuite extends FunSuite {
}

test("double serialization") {
for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue)) {
for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, Double.NaN)) {
val bytes = py.serializeDouble(x)
val deser = py.deserializeDouble(bytes)
assert(x === deser)
// We use `equals` here for comparison because we cannot use `==` for NaN
assert(x.equals(deser))
}
}

test("matrix to 2D array") {
val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
val matrix = Matrices.dense(2, 3, values)
val arr = py.to2dArray(matrix)
val expected = Array(Array[Double](0, 3, 7), Array[Double](1.2, 4.56, 8))
assert(arr === expected)

// Test conversion for empty matrix
val empty = Array[Double]()
val emptyMatrix = Matrices.dense(0, 0, empty)
val empty2D = py.to2dArray(emptyMatrix)
assert(empty2D === Array[Array[Double]]())
}
}
6 changes: 5 additions & 1 deletion python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _serialize_double(d):
"""
Serialize a double (float or numpy.float64) into a mutually understood format.
"""
if type(d) == float or type(d) == float64:
if type(d) == float or type(d) == float64 or type(d) == int or type(d) == long:
d = float64(d)
ba = bytearray(8)
_copyto(d, buffer=ba, offset=0, shape=[1], dtype=float64)
Expand Down Expand Up @@ -176,6 +176,10 @@ def _deserialize_double(ba, offset=0):
True
>>> _deserialize_double(_serialize_double(float64(0.0))) == 0.0
True
>>> _deserialize_double(_serialize_double(1)) == 1.0
True
>>> _deserialize_double(_serialize_double(1L)) == 1.0
True
>>> x = sys.float_info.max
>>> _deserialize_double(_serialize_double(sys.float_info.max)) == x
True
Expand Down
104 changes: 104 additions & 0 deletions python/pyspark/mllib/stat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Python package for statistical functions in MLlib.
"""

from pyspark.mllib._common import \
_get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
_serialize_double, _serialize_double_vector, \
_deserialize_double, _deserialize_double_matrix

class Statistics(object):

@staticmethod
def corr(x, y=None, method=None):
"""
Compute the correlation (matrix) for the input RDD(s) using the
specified method.
Methods currently supported: I{pearson (default), spearman}.
If a single RDD of Vectors is passed in, a correlation matrix
comparing the columns in the input RDD is returned. Use C{method=}
to specify the method to be used for single RDD inout.
If two RDDs of floats are passed in, a single float is returned.
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
>>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
True
>>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
True
>>> Statistics.corr(x, y, "spearman")
0.5
>>> from math import isnan
>>> isnan(Statistics.corr(x, zeros))
True
>>> from linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
>>> Statistics.corr(rdd)
array([[ 1. , 0.05564149, nan, 0.40047142],
[ 0.05564149, 1. , nan, 0.91359586],
[ nan, nan, 1. , nan],
[ 0.40047142, 0.91359586, nan, 1. ]])
>>> Statistics.corr(rdd, method="spearman")
array([[ 1. , 0.10540926, nan, 0.4 ],
[ 0.10540926, 1. , nan, 0.9486833 ],
[ nan, nan, 1. , nan],
[ 0.4 , 0.9486833 , nan, 1. ]])
>>> try:
... Statistics.corr(rdd, "spearman")
... print "Method name as second argument without 'method=' shouldn't be allowed."
... except TypeError:
... pass
"""
sc = x.ctx
# Check inputs to determine whether a single value or a matrix is needed for output.
# Since it's legal for users to use the method name as the second argument, we need to
# check if y is used to specify the method name instead.
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")
if not y:
try:
Xser = _get_unmangled_double_vector_rdd(x)
except TypeError:
raise TypeError("corr called on a single RDD not consisted of Vectors.")
resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
return _deserialize_double_matrix(resultMat)
else:
xSer = _get_unmangled_rdd(x, _serialize_double)
ySer = _get_unmangled_rdd(y, _serialize_double)
result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method)
return result


def _test():
import doctest
from pyspark import SparkContext
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)


if __name__ == "__main__":
_test()

0 comments on commit e34c263

Please sign in to comment.