Skip to content

Commit

Permalink
addressed reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jul 6, 2014
1 parent 6b5b10b commit ee9d260
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,18 +212,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
exact: Boolean = true,
seed: Long = Utils.random.nextLong): RDD[(K, V)]= {

require(fractions.forall({case(k, v) => v >= 0.0}), "Invalid sampling rates.")
require(fractions.forall {case(k, v) => v >= 0.0}, "Invalid sampling rates.")

if (withReplacement) {
val samplingFunc = if (withReplacement) {
val counts = if (exact) Some(this.countByKey()) else None
val samplingFunc =
StratifiedSampler.getPoissonSamplingFunction(self, fractions, exact, counts, seed)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
} else {
val samplingFunc =
StratifiedSampler.getBernoulliSamplingFunction(self, fractions, exact, seed)
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning=true)
}

/**
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ abstract class RDD[T: ClassTag](

/**
* Return a sampled subset of this RDD.
*
*/
def sample(withReplacement: Boolean,
fraction: Double,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private[spark] object SamplingUtils {
private[spark] object PoissonBounds {

val delta = 1e-4 / 3.0
val epsilon = 1e-15

/**
* Compute the threshold for accepting items on the fly. The threshold value is a fairly small
Expand All @@ -87,7 +88,7 @@ private[spark] object PoissonBounds {
var ub = s
while (lb < ub - 1.0) {
val m = (lb + ub) / 2.0
val poisson = new PoissonDistribution(m, 1e-15)
val poisson = new PoissonDistribution(m, epsilon)
val y = poisson.inverseCumulativeProbability(1 - delta)
if (y > s) ub = m else lb = m
}
Expand All @@ -96,7 +97,7 @@ private[spark] object PoissonBounds {

def getMinCount(lmbd: Double): Double = {
if (lmbd == 0) return 0
val poisson = new PoissonDistribution(lmbd, 1e-15)
val poisson = new PoissonDistribution(lmbd, epsilon)
poisson.inverseCumulativeProbability(delta)
}

Expand All @@ -114,7 +115,7 @@ private[spark] object PoissonBounds {
var ub = s + math.sqrt(s / delta) // Chebyshev's inequality
while (lb < ub - 1.0) {
val m = (lb + ub) / 2.0
val poisson = new PoissonDistribution(m, 1e-15)
val poisson = new PoissonDistribution(m, epsilon)
val y = poisson.inverseCumulativeProbability(delta)
if (y >= s) ub = m else lb = m
}
Expand Down
Loading

0 comments on commit ee9d260

Please sign in to comment.