Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Browse files Browse the repository at this point in the history
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
	sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
  • Loading branch information
yhuai committed Jul 29, 2014
2 parents e5f8df5 + dc96536 commit 4ceeb66
Show file tree
Hide file tree
Showing 13 changed files with 731 additions and 73 deletions.
69 changes: 68 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, List => JList, Map => JMap}
import java.lang.{Iterable => JIterable}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -129,6 +129,73 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, seed)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
*
* Use Utils.random.nextLong as the default seed for the random number generator
*/
def sampleByKey(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, Utils.random.nextLong)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ private[spark] object PythonRDD extends Logging {
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
// TODO: Figure out why flatMap is necessay for pyspark
iter.flatMap { row =>
unpickle.loads(row) match {
// in case of objects are pickled in batch mode
case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap)
// Incase the partition doesn't have a collection
// not in batch mode
case obj: JMap[String @unchecked, _] => Seq(obj.toMap)
}
}
Expand Down
54 changes: 45 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package org.apache.spark.rdd

import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.Date
import java.util.{HashMap => JHashMap}
import java.util.{Date, HashMap => JHashMap}

import scala.collection.{Map, mutable}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

Expand All @@ -34,19 +32,19 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob,
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

import org.apache.spark._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -195,6 +193,41 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
foldByKey(zeroValue, defaultPartitioner(self))(func)
}

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use
* additional passes over the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence. When sampling
* without replacement, we need one additional pass over the RDD to guarantee sample size;
* when sampling with replacement, we need two additional passes.
*
* @param withReplacement whether to sample with or without replacement
* @param fractions map of specific keys to sampling rates
* @param seed seed for the random number generator
* @param exact whether sample size needs to be exactly math.ceil(fraction * size) per key
* @return RDD containing the sampled subset
*/
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
exact: Boolean = false,
seed: Long = Utils.random.nextLong): RDD[(K, V)]= {

require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, exact, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, exact, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
Expand Down Expand Up @@ -531,6 +564,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*/
def collectAsMap(): Map[K, V] = {
val data = self.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,83 @@ private[spark] object SamplingUtils {
* ~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success
* rate, where success rate is defined the same as in sampling with replacement.
*
* The smallest sampling rate supported is 1e-10 (in order to avoid running into the limit of the
* RNG's resolution).
*
* @param sampleSizeLowerBound sample size
* @param total size of RDD
* @param withReplacement whether sampling with replacement
* @return a sampling rate that guarantees sufficient sample size with 99.99% success rate
*/
def computeFractionForSampleSize(sampleSizeLowerBound: Int, total: Long,
withReplacement: Boolean): Double = {
val fraction = sampleSizeLowerBound.toDouble / total
if (withReplacement) {
val numStDev = if (sampleSizeLowerBound < 12) 9 else 5
fraction + numStDev * math.sqrt(fraction / total)
PoissonBounds.getUpperBound(sampleSizeLowerBound) / total
} else {
val delta = 1e-4
val gamma = - math.log(delta) / total
math.min(1, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction))
val fraction = sampleSizeLowerBound.toDouble / total
BinomialBounds.getUpperBound(1e-4, total, fraction)
}
}
}

/**
* Utility functions that help us determine bounds on adjusted sampling rate to guarantee exact
* sample sizes with high confidence when sampling with replacement.
*/
private[spark] object PoissonBounds {

/**
* Returns a lambda such that Pr[X > s] is very small, where X ~ Pois(lambda).
*/
def getLowerBound(s: Double): Double = {
math.max(s - numStd(s) * math.sqrt(s), 1e-15)
}

/**
* Returns a lambda such that Pr[X < s] is very small, where X ~ Pois(lambda).
*
* @param s sample size
*/
def getUpperBound(s: Double): Double = {
math.max(s + numStd(s) * math.sqrt(s), 1e-10)
}

private def numStd(s: Double): Double = {
// TODO: Make it tighter.
if (s < 6.0) {
12.0
} else if (s < 16.0) {
9.0
} else {
6.0
}
}
}

/**
* Utility functions that help us determine bounds on adjusted sampling rate to guarantee exact
* sample size with high confidence when sampling without replacement.
*/
private[spark] object BinomialBounds {

val minSamplingRate = 1e-10

/**
* Returns a threshold `p` such that if we conduct n Bernoulli trials with success rate = `p`,
* it is very unlikely to have more than `fraction * n` successes.
*/
def getLowerBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n * (2.0 / 3.0)
fraction + gamma - math.sqrt(gamma * gamma + 3 * gamma * fraction)
}

/**
* Returns a threshold `p` such that if we conduct n Bernoulli trials with success rate = `p`,
* it is very unlikely to have less than `fraction * n` successes.
*/
def getUpperBound(delta: Double, n: Long, fraction: Double): Double = {
val gamma = - math.log(delta) / n
math.min(1,
math.max(minSamplingRate, fraction + gamma + math.sqrt(gamma * gamma + 2 * gamma * fraction)))
}
}
Loading

0 comments on commit 4ceeb66

Please sign in to comment.