Permalink
Browse files

Fix Java API for mapPartitionsWithIndex

  • Loading branch information...
1 parent 0b7b7fd commit 8d849a129e3d6489d697fd24a6c1a3b410f8b3ce @holdenk committed Feb 17, 2014
@@ -73,11 +73,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithIndex[R: ClassTag](
- f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]],
- preservesPartitioning: Boolean = false): JavaRDD[R] =
- new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
- preservesPartitioning))
+ def mapPartitionsWithIndex[R](f: MapPartitionsWithIndexFunction[T, R],
+ preservesPartitioning: Boolean = false): JavaRDD[R] = {
+ import scala.collection.JavaConverters._
+ def fn = (a: Int, b: Iterator[T]) => f.apply(a, asJavaIterator(b)).asScala
+ val newRdd = rdd.mapPartitionsWithIndex(fn, preservesPartitioning)(f.elementType())
+ new JavaRDD(newRdd)(f.elementType())
+ }
/**
* Return a new RDD by applying a function to all elements of this RDD.
@@ -415,6 +415,26 @@ public void javaDoubleRDDHistoGram() {
}
@Test
+ public void mapPartitionsWithIndex() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaRDD<Integer> rddByIndex =
+ rdd.mapPartitionsWithIndex(new MapPartitionsWithIndexFunction<Integer, Integer>() {
+ @Override
+ public Iterator<Integer> call(Integer start, java.util.Iterator<Integer> iter) {
+ List<Integer> list = new ArrayList<Integer>();
+ int pos = start;
+ while (iter.hasNext()) {
+ list.add(iter.next() * pos);
+ pos += 1;
+ }
+ return list.iterator();
+ }
+ }, false);
+ Assert.assertEquals(0, rddByIndex.first().intValue());
+ }
+
+
+ @Test
public void map() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() {

0 comments on commit 8d849a1

Please sign in to comment.