Skip to content

Commit

Permalink
[SPARK-12615] Remove some deprecated APIs in RDD/SparkContext
Browse files Browse the repository at this point in the history
I looked at each case individually and it looks like they can all be removed. The only one that I had to think twice was toArray (I even thought about un-deprecating it, until I realized it was a problem in Java to have toArray returning java.util.List).

Author: Reynold Xin <rxin@databricks.com>

Closes #10569 from rxin/SPARK-12615.
  • Loading branch information
rxin authored and JoshRosen committed Jan 5, 2016
1 parent 7676833 commit 8ce645d
Show file tree
Hide file tree
Showing 22 changed files with 64 additions and 643 deletions.
8 changes: 0 additions & 8 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)

def combineValuesByKey(
iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
Expand All @@ -47,10 +43,6 @@ case class Aggregator[K, V, C] (
combiners.iterator
}

@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)

def combineCombinersByKey(
iter: Iterator[_ <: Product2[K, C]],
context: TaskContext): Iterator[(K, C)] = {
Expand Down
261 changes: 2 additions & 259 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicIntege
import java.util.UUID.randomUUID

import scala.collection.JavaConverters._
import scala.collection.{Map, Set}
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
Expand Down Expand Up @@ -122,20 +122,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def this() = this(new SparkConf())

/**
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param config a [[org.apache.spark.SparkConf]] object specifying other Spark parameters
* @param preferredNodeLocationData not used. Left for backward compatibility.
*/
@deprecated("Passing in preferred locations has no effect at all, see SPARK-8949", "1.5.0")
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
logWarning("Passing in preferred locations has no effect at all, see SPARK-8949")
}

/**
* Alternative constructor that allows setting common Spark properties directly
*
Expand All @@ -155,21 +141,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes.
* @param preferredNodeLocationData not used. Left for backward compatibility.
*/
@deprecated("Passing in preferred locations has no effect at all, see SPARK-10921", "1.6.0")
def this(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
environment: Map[String, String] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
if (preferredNodeLocationData.nonEmpty) {
logWarning("Passing in preferred locations has no effect at all, see SPARK-8949")
}
}

// NOTE: The below constructors could be consolidated using default arguments. Due to
Expand Down Expand Up @@ -267,8 +247,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Generate the random name for a temp folder in external block store.
// Add a timestamp as the suffix here to make it more safe
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
@deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
val tachyonFolderName = externalBlockStoreFolderName

def isLocal: Boolean = (master == "local" || master.startsWith("local["))

Expand Down Expand Up @@ -641,11 +619,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
localProperties.set(props)
}

@deprecated("Properties no longer need to be explicitly initialized.", "1.0.0")
def initLocalProperties() {
localProperties.set(new Properties())
}

/**
* Set a local property that affects jobs submitted from this thread, such as the
* Spark fair scheduler pool.
Expand Down Expand Up @@ -1585,15 +1558,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
taskScheduler.schedulingMode
}

/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0")
def clearFiles() {
addedFiles.clear()
}

/**
* Gets the locality information associated with the partition in a particular rdd
* @param rdd of interest
Expand Down Expand Up @@ -1685,15 +1649,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
*/
@deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0")
def clearJars() {
addedJars.clear()
}

// Shut down the SparkContext.
def stop() {
if (AsynchronousListenerBus.withinListenerThread.value) {
Expand Down Expand Up @@ -1864,63 +1819,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}


/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* The allowLocal flag is deprecated as of Spark 1.5.0+.
*/
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit): Unit = {
if (allowLocal) {
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
}
runJob(rdd, func, partitions, resultHandler)
}

/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
*
* The allowLocal flag is deprecated as of Spark 1.5.0+.
*/
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
if (allowLocal) {
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
}
runJob(rdd, func, partitions)
}

/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*
* The allowLocal argument is deprecated as of Spark 1.5.0+.
*/
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
if (allowLocal) {
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
}
runJob(rdd, func, partitions)
}

/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
Expand Down Expand Up @@ -2094,10 +1992,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
taskScheduler.defaultParallelism
}

/** Default min number of partitions for Hadoop RDDs when not given by user */
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = defaultMinPartitions

/**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
Expand Down Expand Up @@ -2364,113 +2258,6 @@ object SparkContext extends Logging {
*/
private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"

// The following deprecated objects have already been copied to `object AccumulatorParam` to
// make the compiler find them automatically. They are duplicate codes only for backward
// compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the
// following ones.

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.3.0")
object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0
}

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.3.0")
object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.3.0")
object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}

@deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " +
"backward compatibility.", "1.3.0")
object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}

// The following deprecated functions have already been moved to `object RDD` to
// make the compiler find them automatically. They are still kept here for backward compatibility
// and just call the corresponding functions in `object RDD`.

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] =
RDD.rddToPairRDDFunctions(rdd)

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] =
RDD.rddToAsyncRDDActions(rdd)

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
val kf = implicitly[K => Writable]
val vf = implicitly[V => Writable]
// Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
RDD.rddToSequenceFileRDDFunctions(rdd)
}

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)] =
RDD.rddToOrderedRDDFunctions(rdd)

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions =
RDD.doubleRDDToDoubleRDDFunctions(rdd)

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]): DoubleRDDFunctions =
RDD.numericRDDToDoubleRDDFunctions(rdd)

// The following deprecated functions have already been moved to `object WritableFactory` to
// make the compiler find them automatically. They are still kept here for backward compatibility.

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def stringToText(s: String): Text = new Text(s)

private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
: ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
Expand All @@ -2479,50 +2266,6 @@ object SparkContext extends Logging {
arr.map(x => anyToWritable(x)).toArray)
}

// The following deprecated functions have already been moved to `object WritableConverter` to
// make the compiler find them automatically. They are still kept here for backward compatibility
// and just call the corresponding functions in `object WritableConverter`.

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def intWritableConverter(): WritableConverter[Int] =
WritableConverter.intWritableConverter()

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def longWritableConverter(): WritableConverter[Long] =
WritableConverter.longWritableConverter()

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def doubleWritableConverter(): WritableConverter[Double] =
WritableConverter.doubleWritableConverter()

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def floatWritableConverter(): WritableConverter[Float] =
WritableConverter.floatWritableConverter()

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def booleanWritableConverter(): WritableConverter[Boolean] =
WritableConverter.booleanWritableConverter()

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def bytesWritableConverter(): WritableConverter[Array[Byte]] =
WritableConverter.bytesWritableConverter()

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def stringWritableConverter(): WritableConverter[String] =
WritableConverter.stringWritableConverter()

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def writableWritableConverter[T <: Writable](): WritableConverter[T] =
WritableConverter.writableWritableConverter()

/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
Expand Down
Loading

0 comments on commit 8ce645d

Please sign in to comment.