From 63425ff29cd40fe4a8ae8f02472b1d7bc23e145e Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 24 Oct 2016 23:19:25 +0800 Subject: [PATCH 1/3] update V2 --- .../spark/rdd/ZippedPartitionsRDD.scala | 37 +++++-- .../spark/util/ZipPartitionsRDDUtils.scala | 87 +++++++++++++++ .../util/ZipPartitionsRDDUtilsSuite.scala | 101 ++++++++++++++++++ 3 files changed, 214 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/ZipPartitionsRDDUtils.scala create mode 100644 core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 3cb1231bd3477..2fcf617289370 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -45,7 +45,8 @@ private[spark] class ZippedPartitionsPartition( private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( sc: SparkContext, var rdds: Seq[RDD[_]], - preservesPartitioning: Boolean = false) + preservesPartitioning: Boolean = false, + useFirstParentPreferredLocations: Boolean = false) extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { override val partitioner = @@ -58,10 +59,18 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( s"Can't zip RDDs with unequal numbers of partitions: ${rdds.map(_.partitions.length)}") } Array.tabulate[Partition](numParts) { i => - val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i))) - // Check whether there are any hosts that match all RDDs; otherwise return the union - val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y)) - val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct + var locs: Seq[String] = Nil + if (useFirstParentPreferredLocations) { + locs = rdds(0).preferredLocations(rdds(0).partitions(i)) + } + if (locs.isEmpty) { + // if we do not specify the preferredLocationRDD, or the specified preferredLocationRDD + // preferred location is `Nil`, we use default location strategy. + val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i))) + // Check whether there are any hosts that match all RDDs; otherwise return the union + val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y)) + locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct + } new ZippedPartitionsPartition(i, rdds, locs) } } @@ -81,8 +90,10 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] var f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], - preservesPartitioning: Boolean = false) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { + preservesPartitioning: Boolean = false, + useFirstParentPreferredLocations: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning, + useFirstParentPreferredLocations) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions @@ -104,8 +115,10 @@ private[spark] class ZippedPartitionsRDD3 var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C], - preservesPartitioning: Boolean = false) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) { + preservesPartitioning: Boolean = false, + useFirstParentPreferredLocations: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning, + useFirstParentPreferredLocations) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions @@ -131,8 +144,10 @@ private[spark] class ZippedPartitionsRDD4 var rdd2: RDD[B], var rdd3: RDD[C], var rdd4: RDD[D], - preservesPartitioning: Boolean = false) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) { + preservesPartitioning: Boolean = false, + useFirstParentPreferredLocations: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning, + useFirstParentPreferredLocations) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions diff --git a/core/src/main/scala/org/apache/spark/util/ZipPartitionsRDDUtils.scala b/core/src/main/scala/org/apache/spark/util/ZipPartitionsRDDUtils.scala new file mode 100644 index 0000000000000..e6fc6696798bb --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/ZipPartitionsRDDUtils.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.reflect.ClassTag + +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2, ZippedPartitionsRDD3, + ZippedPartitionsRDD4} + +object ZipPartitionsRDDUtils { + + /** + * The zipPartitionsWithPreferredLocation helper function is similar to RDD.zipPartitions, but + * it can be used to specify the zipPartitions task preferred locations to be consistent with + * the fisrt zipped RDD. If the fisrt zipped RDD do not have preferred location, + * it will fallback to the default `zipPartition` preferred location strategy. + * This helper function can be used when one large RDD zipped with other small RDDs, we can set + * the first zipped RDD (the majorRdd parameter) to be the large RDD to improve data locality. + */ + def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, V: ClassTag] + (majorRdd: RDD[A], rdd2: RDD[B], preservesPartitioning: Boolean) + (f: (Iterator[A], Iterator[B]) => Iterator[V]): RDD[V] = { + val sc = majorRdd.sparkContext + majorRdd.withScope { + new ZippedPartitionsRDD2(sc, sc.clean(f), majorRdd, rdd2, preservesPartitioning, + useFirstParentPreferredLocations = true) + } + } + + def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, V: ClassTag] + (majorRdd: RDD[A], rdd2: RDD[B]) + (f: (Iterator[A], Iterator[B]) => Iterator[V]): RDD[V] = { + zipPartitionsWithPreferredLocation(majorRdd, rdd2, preservesPartitioning = false)(f) + } + + def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag] + (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) + (f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = { + val sc = majorRdd.sparkContext + majorRdd.withScope { + new ZippedPartitionsRDD3(sc, sc.clean(f), majorRdd, rdd2, rdd3, + preservesPartitioning, useFirstParentPreferredLocations = true) + } + } + + def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag] + (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C]) + (f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = { + zipPartitionsWithPreferredLocation(majorRdd, rdd2, rdd3, preservesPartitioning = false)(f) + } + + def zipPartitionsWithPreferredLocation + [A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] + (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], + preservesPartitioning: Boolean) + (f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = { + val sc = majorRdd.sparkContext + majorRdd.withScope { + new ZippedPartitionsRDD4(sc, sc.clean(f), majorRdd, rdd2, rdd3, rdd4, + preservesPartitioning, useFirstParentPreferredLocations = true) + } + } + + def zipPartitionsWithPreferredLocation + [A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] + (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) + (f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = { + zipPartitionsWithPreferredLocation( + majorRdd, rdd2, rdd3, rdd4, preservesPartitioning = false)(f) + } + +} diff --git a/core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala new file mode 100644 index 0000000000000..99e66c4a78983 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.File + +import org.apache.spark.{SharedSparkContext, SparkFunSuite} + +class ZipPartitionsRDDUtilsSuite extends SparkFunSuite with SharedSparkContext { + + override def beforeAll(): Unit = { + super.beforeAll() + } + + test("zipPartitionsWithPreferredLocation 2 rdds") { + val rdd1 = sc.makeRDD(List((1, List("a")), (2, List("b")), (3, List("c")))) + val rdd2 = sc.makeRDD(List((1, List("d")), (2, List("e")), (3, List("f")))) + + val res1 = ZipPartitionsRDDUtils.zipPartitionsWithPreferredLocation(rdd1, rdd2) { + (iter1, iter2) => { + iter1.zip(iter2).map(t => t._1 + t._2) + } + } + assert(res1.collect() === Array(2, 4, 6)) + assert(res1.partitions.map(split => res1.preferredLocations(split)) + === Array(List("a"), List("b"), List("c"))) + + val res2 = ZipPartitionsRDDUtils.zipPartitionsWithPreferredLocation(rdd2, rdd1) { + (iter1, iter2) => { + iter1.zip(iter2).map(t => t._1 + t._2) + } + } + assert(res2.collect() === Array(2, 4, 6)) + assert(res2.partitions.map(split => res1.preferredLocations(split)) + === Array(List("d"), List("e"), List("f"))) + } + + test("zipPartitionsWithPreferredLocation 3 rdds") { + val rdd1 = sc.makeRDD(List((1, List("a")), (2, List("b")), (3, List("c")))) + val rdd2 = sc.makeRDD(List((1, List("d")), (2, List("e")), (3, List("f")))) + val rdd3 = sc.makeRDD(List((1, List("g")), (2, List("h")), (3, List("i")))) + + val res1 = ZipPartitionsRDDUtils.zipPartitionsWithPreferredLocation(rdd1, rdd2, rdd3) { + (iter1, iter2, iter3) => { + iter1.zip(iter2).zip(iter3).map(t => t._1._1 + t._1._2 + t._2) + } + } + assert(res1.collect() === Array(3, 6, 9)) + assert(res1.partitions.map(split => res1.preferredLocations(split)) + === Array(List("a"), List("b"), List("c"))) + } + + test("zipPartitionsWithPreferredLocation 4 rdds") { + val rdd1 = sc.makeRDD(List((1, List("a")), (2, List("b")), (3, List("c")))) + val rdd2 = sc.makeRDD(List((1, List("d")), (2, List("e")), (3, List("f")))) + val rdd3 = sc.makeRDD(List((1, List("g")), (2, List("h")), (3, List("i")))) + val rdd4 = sc.makeRDD(List((1, List("j")), (2, List("k")), (3, List("l")))) + + val res1 = ZipPartitionsRDDUtils.zipPartitionsWithPreferredLocation(rdd1, rdd2, rdd3, rdd4) { + (iter1, iter2, iter3, iter4) => { + iter1.zip(iter2).zip(iter3.zip(iter4)).map(t => t._1._1 + t._1._2 + t._2._1 + t._2._2) + } + } + assert(res1.collect() === Array(4, 8, 12)) + assert(res1.partitions.map(split => res1.preferredLocations(split)) + === Array(List("a"), List("b"), List("c"))) + } + + test("zipPartitionsWithPreferredLocation 3 rdds when major rdd preferred locations are Nil") { + val rdd1 = sc.makeRDD(List((1, Nil), (2, Nil), (3, Nil))) + val rdd2 = sc.makeRDD(List((1, List("d")), (2, List("e")), (3, List("f")))) + val rdd3 = sc.makeRDD(List((1, List("g")), (2, List("h")), (3, List("i")))) + + val res1 = ZipPartitionsRDDUtils.zipPartitionsWithPreferredLocation(rdd1, rdd2, rdd3) { + (iter1, iter2, iter3) => { + iter1.zip(iter2).zip(iter3).map(t => t._1._1 + t._1._2 + t._2) + } + } + assert(res1.collect() === Array(3, 6, 9)) + + // This case zipPartitionsWithPreferredLocation will fallback using + // default zipPartitions preferred locations strategy. + assert(res1.partitions.map(split => res1.preferredLocations(split)) + === Array(List("d", "g"), List("e", "h"), List("f", "i"))) + } +} From 7f7296afb1f3e23332c1dc3a223ad15996f9898c Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 26 Oct 2016 21:51:18 +0800 Subject: [PATCH 2/3] some minor updates. --- .../{util => rdd}/ZipPartitionsRDDUtils.scala | 28 +++++++++---------- .../util/ZipPartitionsRDDUtilsSuite.scala | 1 + 2 files changed, 15 insertions(+), 14 deletions(-) rename core/src/main/scala/org/apache/spark/{util => rdd}/ZipPartitionsRDDUtils.scala (81%) diff --git a/core/src/main/scala/org/apache/spark/util/ZipPartitionsRDDUtils.scala b/core/src/main/scala/org/apache/spark/rdd/ZipPartitionsRDDUtils.scala similarity index 81% rename from core/src/main/scala/org/apache/spark/util/ZipPartitionsRDDUtils.scala rename to core/src/main/scala/org/apache/spark/rdd/ZipPartitionsRDDUtils.scala index e6fc6696798bb..65ac05f723507 100644 --- a/core/src/main/scala/org/apache/spark/util/ZipPartitionsRDDUtils.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZipPartitionsRDDUtils.scala @@ -15,13 +15,10 @@ * limitations under the License. */ -package org.apache.spark.util +package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2, ZippedPartitionsRDD3, - ZippedPartitionsRDD4} - object ZipPartitionsRDDUtils { /** @@ -30,14 +27,17 @@ object ZipPartitionsRDDUtils { * the fisrt zipped RDD. If the fisrt zipped RDD do not have preferred location, * it will fallback to the default `zipPartition` preferred location strategy. * This helper function can be used when one large RDD zipped with other small RDDs, we can set - * the first zipped RDD (the majorRdd parameter) to be the large RDD to improve data locality. + * the first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data locality. + * + * `preservesPartitioner` indicates whether the input function preserves the partitioner, which + * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, V: ClassTag] - (majorRdd: RDD[A], rdd2: RDD[B], preservesPartitioning: Boolean) + (majorRdd: RDD[A], rdd2: RDD[B], preservesPartitioner: Boolean) (f: (Iterator[A], Iterator[B]) => Iterator[V]): RDD[V] = { val sc = majorRdd.sparkContext majorRdd.withScope { - new ZippedPartitionsRDD2(sc, sc.clean(f), majorRdd, rdd2, preservesPartitioning, + new ZippedPartitionsRDD2(sc, sc.clean(f), majorRdd, rdd2, preservesPartitioner, useFirstParentPreferredLocations = true) } } @@ -45,34 +45,34 @@ object ZipPartitionsRDDUtils { def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, V: ClassTag] (majorRdd: RDD[A], rdd2: RDD[B]) (f: (Iterator[A], Iterator[B]) => Iterator[V]): RDD[V] = { - zipPartitionsWithPreferredLocation(majorRdd, rdd2, preservesPartitioning = false)(f) + zipPartitionsWithPreferredLocation(majorRdd, rdd2, preservesPartitioner = false)(f) } def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag] - (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) + (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], preservesPartitioner: Boolean) (f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = { val sc = majorRdd.sparkContext majorRdd.withScope { new ZippedPartitionsRDD3(sc, sc.clean(f), majorRdd, rdd2, rdd3, - preservesPartitioning, useFirstParentPreferredLocations = true) + preservesPartitioner, useFirstParentPreferredLocations = true) } } def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag] (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = { - zipPartitionsWithPreferredLocation(majorRdd, rdd2, rdd3, preservesPartitioning = false)(f) + zipPartitionsWithPreferredLocation(majorRdd, rdd2, rdd3, preservesPartitioner = false)(f) } def zipPartitionsWithPreferredLocation [A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], - preservesPartitioning: Boolean) + preservesPartitioner: Boolean) (f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = { val sc = majorRdd.sparkContext majorRdd.withScope { new ZippedPartitionsRDD4(sc, sc.clean(f), majorRdd, rdd2, rdd3, rdd4, - preservesPartitioning, useFirstParentPreferredLocations = true) + preservesPartitioner, useFirstParentPreferredLocations = true) } } @@ -81,7 +81,7 @@ object ZipPartitionsRDDUtils { (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = { zipPartitionsWithPreferredLocation( - majorRdd, rdd2, rdd3, rdd4, preservesPartitioning = false)(f) + majorRdd, rdd2, rdd3, rdd4, preservesPartitioner = false)(f) } } diff --git a/core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala index 99e66c4a78983..1006559dd1387 100644 --- a/core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ZipPartitionsRDDUtilsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import java.io.File import org.apache.spark.{SharedSparkContext, SparkFunSuite} +import org.apache.spark.rdd.ZipPartitionsRDDUtils class ZipPartitionsRDDUtilsSuite extends SparkFunSuite with SharedSparkContext { From 439def9e5fd46f0650cba24c9232c3f62349023e Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 28 Oct 2016 23:54:54 +0800 Subject: [PATCH 3/3] several minor updates V2 --- .../spark/rdd/ZipPartitionsRDDUtils.scala | 68 ++++++++++++++----- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ZipPartitionsRDDUtils.scala b/core/src/main/scala/org/apache/spark/rdd/ZipPartitionsRDDUtils.scala index 65ac05f723507..e292b60d361bf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZipPartitionsRDDUtils.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZipPartitionsRDDUtils.scala @@ -19,69 +19,105 @@ package org.apache.spark.rdd import scala.reflect.ClassTag +/** + * The `ZipPartitionsRDDUtils` contains a series of `zipPartitionsWithPreferredLocation` helper + * functions which are similar to `RDD.zipPartitions`, but it can be used to specify the + * zipPartitions task preferred locations to be consistent with the fisrt zipped RDD. + * + * If the fisrt zipped RDD do not have preferred location, it will fallback to the default + * `zipPartition` preferred location strategy. + */ object ZipPartitionsRDDUtils { /** - * The zipPartitionsWithPreferredLocation helper function is similar to RDD.zipPartitions, but - * it can be used to specify the zipPartitions task preferred locations to be consistent with - * the fisrt zipped RDD. If the fisrt zipped RDD do not have preferred location, - * it will fallback to the default `zipPartition` preferred location strategy. - * This helper function can be used when one large RDD zipped with other small RDDs, we can set - * the first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data locality. + * This helper function can be used when one large RDD zipped with another one small RDD, we + * can set first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data + * locality. * - * `preservesPartitioner` indicates whether the input function preserves the partitioner, which + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, V: ClassTag] - (majorRdd: RDD[A], rdd2: RDD[B], preservesPartitioner: Boolean) + (majorRdd: RDD[A], rdd2: RDD[B], preservesPartitioning: Boolean) (f: (Iterator[A], Iterator[B]) => Iterator[V]): RDD[V] = { val sc = majorRdd.sparkContext majorRdd.withScope { - new ZippedPartitionsRDD2(sc, sc.clean(f), majorRdd, rdd2, preservesPartitioner, + new ZippedPartitionsRDD2(sc, sc.clean(f), majorRdd, rdd2, preservesPartitioning, useFirstParentPreferredLocations = true) } } + /** + * This helper function can be used when one large RDD zipped with another one small RDD, we + * can set first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data + * locality. + */ def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, V: ClassTag] (majorRdd: RDD[A], rdd2: RDD[B]) (f: (Iterator[A], Iterator[B]) => Iterator[V]): RDD[V] = { - zipPartitionsWithPreferredLocation(majorRdd, rdd2, preservesPartitioner = false)(f) + zipPartitionsWithPreferredLocation(majorRdd, rdd2, preservesPartitioning = false)(f) } + /** + * This helper function can be used when one large RDD zipped with another two small RDDs, we + * can set first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data + * locality. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. + */ def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag] - (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], preservesPartitioner: Boolean) + (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) (f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = { val sc = majorRdd.sparkContext majorRdd.withScope { new ZippedPartitionsRDD3(sc, sc.clean(f), majorRdd, rdd2, rdd3, - preservesPartitioner, useFirstParentPreferredLocations = true) + preservesPartitioning, useFirstParentPreferredLocations = true) } } + /** + * This helper function can be used when one large RDD zipped with another two small RDDs, we + * can set first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data + * locality. + */ def zipPartitionsWithPreferredLocation[A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag] (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = { - zipPartitionsWithPreferredLocation(majorRdd, rdd2, rdd3, preservesPartitioner = false)(f) + zipPartitionsWithPreferredLocation(majorRdd, rdd2, rdd3, preservesPartitioning = false)(f) } + /** + * This helper function can be used when one large RDD zipped with another three small RDDs, we + * can set first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data + * locality. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. + */ def zipPartitionsWithPreferredLocation [A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], - preservesPartitioner: Boolean) + preservesPartitioning: Boolean) (f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = { val sc = majorRdd.sparkContext majorRdd.withScope { new ZippedPartitionsRDD4(sc, sc.clean(f), majorRdd, rdd2, rdd3, rdd4, - preservesPartitioner, useFirstParentPreferredLocations = true) + preservesPartitioning, useFirstParentPreferredLocations = true) } } + /** + * This helper function can be used when one large RDD zipped with another three small RDDs, we + * can set first zipped RDD (the `majorRdd` parameter) to be the large RDD to improve data + * locality. + */ def zipPartitionsWithPreferredLocation [A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (majorRdd: RDD[A], rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = { zipPartitionsWithPreferredLocation( - majorRdd, rdd2, rdd3, rdd4, preservesPartitioner = false)(f) + majorRdd, rdd2, rdd3, rdd4, preservesPartitioning = false)(f) } }