Skip to content

Commit

Permalink
Deprected *With functions in scala and added a few missing Java APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
ScrapCodes committed Feb 27, 2014
1 parent 84f7ca1 commit 3ddc8bb
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -240,6 +240,7 @@ class SparkContext(
localProperties.set(props)
}

@deprecated("Properties no longer need to be explicitly initialized.", "1.0.0")
def initLocalProperties() {
localProperties.set(new Properties())
}
Expand Down Expand Up @@ -308,7 +309,7 @@ class SparkContext(
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)

def initDriverMetrics() {
private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Expand Up @@ -126,6 +126,8 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))

def generator = rdd.generator

override def toString = rdd.toString

/** Assign a name to this RDD */
Expand Down
Expand Up @@ -92,6 +92,24 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork

private[spark] val env = sc.env

def isLocal = sc.isLocal

def sparkUser = sc.sparkUser

def master = sc.master

def appName = sc.appName

def jars = JavaConversions.seqAsJavaList(sc.jars)

def startTime = sc.startTime

/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism = sc.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinSplits = sc.defaultMinSplits

/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -543,7 +543,8 @@ abstract class RDD[T: ClassTag](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def mapWith[A: ClassTag, U: ClassTag]
@deprecated("use mapPartitionsWithIndex", "1.0.0")
def mapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => U): RDD[U] = {
mapPartitionsWithIndex((index, iter) => {
Expand All @@ -557,7 +558,8 @@ abstract class RDD[T: ClassTag](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def flatMapWith[A: ClassTag, U: ClassTag]
@deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0")
def flatMapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => Seq[U]): RDD[U] = {
mapPartitionsWithIndex((index, iter) => {
Expand All @@ -571,7 +573,8 @@ abstract class RDD[T: ClassTag](
* This additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) {
@deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")
def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit) {
mapPartitionsWithIndex { (index, iter) =>
val a = constructA(index)
iter.map(t => {f(t, a); t})
Expand All @@ -583,7 +586,8 @@ abstract class RDD[T: ClassTag](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
@deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = {
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.filter(t => p(t, a))
Expand Down

0 comments on commit 3ddc8bb

Please sign in to comment.