Skip to content

Commit

Permalink
Fix Java API for mapPartitionsWithIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Mar 11, 2014
1 parent e4962ab commit f484afc
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Expand Up @@ -72,11 +72,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
def mapPartitionsWithIndex[R: ClassTag](
f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
preservesPartitioning))
def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R],
preservesPartitioning: Boolean = false): JavaRDD[R] = {
import scala.collection.JavaConverters._
def fn = (a: Int, b: Iterator[T]) => f.apply(a, asJavaIterator(b)).asScala
val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(f.elementType())
new JavaRDD(newRdd)(f.elementType())
}

/**
* Return a new RDD by applying a function to all elements of this RDD.
Expand Down

0 comments on commit f484afc

Please sign in to comment.