Skip to content

Commit

Permalink
bug fix for spearman corner case
Browse files Browse the repository at this point in the history
where numPartition >= size in the input RDDs
  • Loading branch information
dorx committed Aug 1, 2014
1 parent ef4ff00 commit 043ff83
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 21 deletions.
20 changes: 12 additions & 8 deletions mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Statistics {

/**
* Compute the Pearson correlation matrix for the input RDD of Vectors.
* Returns NaN if either vector has 0 variance.
* Columns with 0 covariance produce NaN entries in the correlation matrix.
*
* @param X an RDD[Vector] for which the correlation matrix is to be computed.
* @return Pearson correlation matrix comparing columns in X.
Expand All @@ -39,7 +39,7 @@ object Statistics {

/**
* Compute the correlation matrix for the input RDD of Vectors using the specified method.
* Methods currently supported: `pearson` (default), `spearman`
* Methods currently supported: `pearson` (default), `spearman`.
*
* Note that for Spearman, a rank correlation, we need to create an RDD[Double] for each column
* and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector],
Expand All @@ -55,20 +55,24 @@ object Statistics {

/**
* Compute the Pearson correlation for the input RDDs.
* Columns with 0 covariance produce NaN entries in the correlation matrix.
* Returns NaN if either vector has 0 variance.
*
* Note: the two input RDDs need to have the same number of partitions.
*
* @param x RDD[Double] of the same cardinality as y
* @param y RDD[Double] of the same cardinality as x
* @param x RDD[Double] of the same cardinality as y.
* @param y RDD[Double] of the same cardinality as x.
* @return A Double containing the Pearson correlation between the two input RDD[Double]s
*/
def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y)

/**
* Compute the correlation for the input RDDs using the specified method.
* Methods currently supported: pearson (default), spearman
* Methods currently supported: `pearson` (default), `spearman`.
*
* Note: the two input RDDs need to have the same number of partitions.
*
* @param x RDD[Double] of the same cardinality as y
* @param y RDD[Double] of the same cardinality as x
* @param x RDD[Double] of the same cardinality as y.
* @param y RDD[Double] of the same cardinality as x.
* @param method String specifying the method to use for computing correlation.
* Supported: `pearson` (default), `spearman`
*@return A Double containing the correlation between the two input RDD[Double]s using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
ranks(k) = getRanks(column)
}

val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X.partitions.size)
PearsonCorrelation.computeCorrelationMatrix(ranksMat)
}

Expand All @@ -89,20 +89,17 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
// add an extra element to signify the end of the list so that flatMap can flush the last
// batch of duplicates
val padded = iter ++
Iterator[((Double, Long), Long)](((Double.NaN, -1L), -1L))
var lastVal = 0.0
var firstRank = 0.0
val idBuffer = new ArrayBuffer[Long]()
val padded = iter ++ Iterator[((Double, Long), Long)](((Double.NaN, -1L), -1L))
val firstEntry = padded.next()
var lastVal = firstEntry._1._1
var firstRank = firstEntry._2.toDouble
val idBuffer = new ArrayBuffer[Long]() += firstEntry._1._2
padded.flatMap { case ((v, id), rank) =>
if (v == lastVal && id != Long.MinValue) {
if (v == lastVal && id != Long.MinValue) {
idBuffer += id
Iterator.empty
} else {
val entries = if (idBuffer.size == 0) {
// edge case for the first value matching the initial value of lastVal
Iterator.empty
} else if (idBuffer.size == 1) {
val entries = if (idBuffer.size == 1) {
Iterator((idBuffer(0), firstRank))
} else {
val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0
Expand All @@ -119,8 +116,8 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
ranks
}

private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
val partitioner = new HashPartitioner(input.partitions.size)
private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], numPartitions: Int): RDD[Vector] = {
val partitioner = new HashPartitioner(numPartitions)
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
cogrouped.map {
case (_, values: Array[Iterable[_]]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {
val p1 = Statistics.corr(x, y, "pearson")
assert(approxEqual(expected, default))
assert(approxEqual(expected, p1))

// numPartitions >= size for input RDDs
for (numParts <- List(xData.size, xData.size * 2)) {
val x1 = sc.parallelize(xData, numParts)
val y1 = sc.parallelize(yData, numParts)
val p2 = Statistics.corr(x1, y1)
assert(approxEqual(expected, p2))
}
}

test("corr(x, y) spearman") {
Expand All @@ -54,6 +62,14 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {
val expected = 0.5
val s1 = Statistics.corr(x, y, "spearman")
assert(approxEqual(expected, s1))

// numPartitions >= size for input RDDs
for (numParts <- List(xData.size, xData.size * 2)) {
val x1 = sc.parallelize(xData, numParts)
val y1 = sc.parallelize(yData, numParts)
val s2 = Statistics.corr(x1, y1, "spearman")
assert(approxEqual(expected, s2))
}
}

test("corr(X) default, pearson") {
Expand Down

0 comments on commit 043ff83

Please sign in to comment.