Skip to content

Commit

Permalink
Remove @experimental annotations in core, streaming for items that ex…
Browse files Browse the repository at this point in the history
…isted in 1.2.0 or before
  • Loading branch information
srowen committed Nov 1, 2015
1 parent 643c49c commit 2439d71
Show file tree
Hide file tree
Showing 15 changed files with 2 additions and 75 deletions.
10 changes: 1 addition & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -45,7 +45,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
Expand Down Expand Up @@ -870,8 +870,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
Expand Down Expand Up @@ -902,7 +900,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
@Experimental
def binaryFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
Expand All @@ -922,8 +919,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* '''Note:''' We ensure that the byte array for each record in the resulting RDD
Expand All @@ -936,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
def binaryRecords(
path: String,
recordLength: Int,
Expand Down Expand Up @@ -1963,10 +1957,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* :: Experimental ::
* Submit a job for execution and return a FutureJob holding the result.
*/
@Experimental
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
Expand Down
Expand Up @@ -24,7 +24,6 @@ import scala.reflect.ClassTag

import org.apache.spark.Partitioner
import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -209,25 +208,19 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double])
srdd.meanApprox(timeout, confidence)

/**
* :: Experimental ::
* Approximate operation to return the mean within a timeout.
*/
@Experimental
def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout)

/**
* :: Experimental ::
* Approximate operation to return the sum within a timeout.
*/
@Experimental
def sumApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] =
srdd.sumApprox(timeout, confidence)

/**
* :: Experimental ::
* Approximate operation to return the sum within a timeout.
*/
@Experimental
def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)

/**
Expand Down
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
Expand Down Expand Up @@ -159,7 +158,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
sampleByKey(withReplacement, fractions, Utils.random.nextLong)

/**
* ::Experimental::
* Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
* math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
*
Expand All @@ -169,14 +167,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
* two additional passes.
*/
@Experimental
def sampleByKeyExact(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed))

/**
* ::Experimental::
* Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
* math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
*
Expand All @@ -188,7 +184,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*
* Use Utils.random.nextLong as the default seed for the random number generator.
*/
@Experimental
def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong)

Expand Down Expand Up @@ -300,20 +295,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey())

/**
* :: Experimental ::
* Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@Experimental
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsSerializableJavaMap)

/**
* :: Experimental ::
* Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout, confidence).map(mapAsSerializableJavaMap)
Expand Down
Expand Up @@ -28,7 +28,6 @@ import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
Expand Down Expand Up @@ -436,20 +435,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def count(): Long = rdd.count()

/**
* :: Experimental ::
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
@Experimental
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
rdd.countApprox(timeout, confidence)

/**
* :: Experimental ::
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
@Experimental
def countApprox(timeout: Long): PartialResult[BoundedDouble] =
rdd.countApprox(timeout)

Expand Down
Expand Up @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.AccumulatorParam._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
Expand Down Expand Up @@ -266,8 +265,6 @@ class JavaSparkContext(val sc: SparkContext)
new JavaPairRDD(sc.binaryFiles(path, minPartitions))

/**
* :: Experimental ::
*
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
Expand All @@ -294,19 +291,15 @@ class JavaSparkContext(val sc: SparkContext)
*
* @note Small files are preferred; very large files but may cause bad performance.
*/
@Experimental
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.binaryRecords(path, recordLength))
}
Expand Down
Expand Up @@ -24,17 +24,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* :: Experimental ::
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter[T, + U] extends Serializable {
def convert(obj: T): U
}
Expand Down
Expand Up @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}

import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil

/**
Expand Down Expand Up @@ -129,7 +128,6 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
* @note TaskAttemptContext is not serializable resulting in the confBytes construct
* @note CombineFileSplit is not serializable resulting in the splitBytes construct
*/
@Experimental
class PortableDataStream(
isplit: CombineFileSplit,
context: TaskAttemptContext,
Expand Down
Expand Up @@ -17,13 +17,9 @@

package org.apache.spark.partial

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* A Double value with error bars and associated confidence.
*/
@Experimental
class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
override def toString(): String = "[%.3f, %.3f]".format(low, high)
}
Expand Up @@ -17,9 +17,6 @@

package org.apache.spark.partial

import org.apache.spark.annotation.Experimental

@Experimental
class PartialResult[R](initialVal: R, isFinal: Boolean) {
private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
private var failure: Option[Exception] = None
Expand Down
Expand Up @@ -74,10 +74,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}

/**
* :: Experimental ::
* Approximate operation to return the mean within a timeout.
*/
@Experimental
def meanApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope {
Expand All @@ -87,10 +85,8 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}

/**
* :: Experimental ::
* Approximate operation to return the sum within a timeout.
*/
@Experimental
def sumApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = self.withScope {
Expand Down
Expand Up @@ -274,7 +274,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* ::Experimental::
* Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
* math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
*
Expand All @@ -289,7 +288,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* @param seed seed for the random number generator
* @return RDD containing the sampled subset
*/
@Experimental
def sampleByKeyExact(
withReplacement: Boolean,
fractions: Map[K, Double],
Expand Down Expand Up @@ -384,19 +382,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* :: Experimental ::
* Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[Map[K, BoundedDouble]] = self.withScope {
self.map(_._1).countByValueApprox(timeout, confidence)
}

/**
* :: Experimental ::
*
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
Expand All @@ -413,7 +407,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* If `sp` equals 0, the sparse representation is skipped.
* @param partitioner Partitioner to use for the resulting RDD.
*/
@Experimental
def countApproxDistinctByKey(
p: Int,
sp: Int,
Expand Down
8 changes: 1 addition & 7 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
Expand Down Expand Up @@ -1119,11 +1119,9 @@ abstract class RDD[T: ClassTag](
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

/**
* :: Experimental ::
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
@Experimental
def countApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
Expand Down Expand Up @@ -1152,10 +1150,8 @@ abstract class RDD[T: ClassTag](
}

/**
* :: Experimental ::
* Approximate version of countByValue().
*/
@Experimental
def countByValueApprox(timeout: Long, confidence: Double = 0.95)
(implicit ord: Ordering[T] = null)
: PartialResult[Map[T, BoundedDouble]] = withScope {
Expand All @@ -1174,7 +1170,6 @@ abstract class RDD[T: ClassTag](
}

/**
* :: Experimental ::
* Return approximate number of distinct elements in the RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
Expand All @@ -1190,7 +1185,6 @@ abstract class RDD[T: ClassTag](
* @param sp The precision value for the sparse set, between 0 and 32.
* If `sp` equals 0, the sparse representation is skipped.
*/
@Experimental
def countApproxDistinct(p: Int, sp: Int): Long = withScope {
require(p >= 4, s"p ($p) must be >= 4")
require(sp <= 32, s"sp ($sp) must be <= 32")
Expand Down

0 comments on commit 2439d71

Please sign in to comment.