From d1caa65c7440cb92aed40dcab4b95c03389350c4 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 22 Jul 2014 02:15:44 -0700 Subject: [PATCH 1/4] add doc to explain preservesPartitioning fix wrong usage of preservesPartitioning make sample preserse partitioning --- .../spark/rdd/PartitionwiseSampledRDD.scala | 4 ++++ .../main/scala/org/apache/spark/rdd/RDD.scala | 20 ++++++++++++------- .../scala/org/apache/spark/rdd/RDDSuite.scala | 9 +++++++++ .../BinaryClassificationMetrics.scala | 4 ++-- .../mllib/linalg/distributed/RowMatrix.scala | 8 ++++---- .../spark/mllib/recommendation/ALS.scala | 2 +- .../org/apache/spark/mllib/util/MLUtils.scala | 4 ++-- 7 files changed, 35 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala index b5b8a5706deb3..3ac27f2ae4572 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala @@ -39,6 +39,7 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) * * @param prev RDD to be sampled * @param sampler a random sampler + * @param preservesPartitioning whether the partitioner of the parent RDD should be preserved * @param seed random seed * @tparam T input RDD item type * @tparam U sampled RDD item type @@ -46,9 +47,12 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag]( prev: RDD[T], sampler: RandomSampler[T, U], + @transient preservesPartitioning: Boolean, @transient seed: Long = Utils.random.nextLong) extends RDD[U](prev) { + @transient override val partitioner = if (preservesPartitioning) prev.partitioner else None + override def getPartitions: Array[Partition] = { val random = new Random(seed) firstParent[T].partitions.map(x => new PartitionwiseSampledRDDPartition(x, random.nextLong())) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a1f2827248891..6e00d4e7335e8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -356,9 +356,9 @@ abstract class RDD[T: ClassTag]( seed: Long = Utils.random.nextLong): RDD[T] = { require(fraction >= 0.0, "Invalid fraction value: " + fraction) if (withReplacement) { - new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), seed) + new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) } else { - new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), seed) + new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) } } @@ -374,7 +374,7 @@ abstract class RDD[T: ClassTag]( val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => - new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), seed) + new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed) }.toArray } @@ -585,7 +585,9 @@ abstract class RDD[T: ClassTag]( } /** - * Return a new RDD by applying a function to each partition of this RDD. + * Return a new RDD by applying a function to each partition of this RDD. Note that + * `preservesPartitioning` means whether to preserve the partitioner of this RDD, which should be + * `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { @@ -595,7 +597,9 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index - * of the original partition. + * of the original partition. Note that `preservesPartitioning` means whether to preserve the + * partitioner of this RDD, which should be `false` unless this is a pair RDD and the input + * function doesn't modify the keys. */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { @@ -606,7 +610,9 @@ abstract class RDD[T: ClassTag]( /** * :: DeveloperApi :: * Return a new RDD by applying a function to each partition of this RDD. This is a variant of - * mapPartitions that also passes the TaskContext into the closure. + * mapPartitions that also passes the TaskContext into the closure. Note that + * `preservesPartitioning` means whether to preserve the partitioner of this RDD, which should be + * `false` unless this is a pair RDD and the input function doesn't modify the keys. */ @DeveloperApi def mapPartitionsWithContext[U: ClassTag]( @@ -689,7 +695,7 @@ abstract class RDD[T: ClassTag]( * a map on the other). */ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = { - zipPartitions(other, true) { (thisIter, otherIter) => + zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext = (thisIter.hasNext, otherIter.hasNext) match { case (true, true) => true diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 2924de112934c..53499cfb8e832 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -523,6 +523,15 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sortedTopK === nums.sorted(ord).take(5)) } + test("sample preserves partitioner") { + val partitioner = new HashPartitioner(2) + val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner) + for (withReplacement <- Seq((true, false))) { + val sampled = rdd.sample(withReplacement, 1.0) + assert(sampled.partitioner.get === partitiner) + } + } + test("takeSample") { val n = 1000000 val data = sc.parallelize(1 to n, 2) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 079743742d86d..1af40de2c7fcf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -103,11 +103,11 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) extends mergeValue = (c: BinaryLabelCounter, label: Double) => c += label, mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 ).sortByKey(ascending = false) - val agg = counts.values.mapPartitions({ iter => + val agg = counts.values.mapPartitions { iter => val agg = new BinaryLabelCounter() iter.foreach(agg += _) Iterator(agg) - }, preservesPartitioning = true).collect() + }.collect() val partitionwiseCumulativeCounts = agg.scanLeft(new BinaryLabelCounter())( (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index f4c403bc7861c..8c2b044ea73f2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -377,9 +377,9 @@ class RowMatrix( s"Only support dense matrix at this time but found ${B.getClass.getName}.") val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray) - val AB = rows.mapPartitions({ iter => + val AB = rows.mapPartitions { iter => val Bi = Bb.value - iter.map(row => { + iter.map { row => val v = BDV.zeros[Double](k) var i = 0 while (i < k) { @@ -387,8 +387,8 @@ class RowMatrix( i += 1 } Vectors.fromBreeze(v) - }) - }, preservesPartitioning = true) + } + } new RowMatrix(AB, nRows, B.numCols) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index cc56fd6ef28d6..7487fe8a919d0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -430,7 +430,7 @@ class ALS private ( val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner) val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) - }, true) + }, preservesPartitioning = true) val inLinks = links.mapValues(_._1) val outLinks = links.mapValues(_._2) inLinks.persist(StorageLevel.MEMORY_AND_DISK) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index aaf92a1a8869a..30de24ad89f98 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -264,8 +264,8 @@ object MLUtils { (1 to numFolds).map { fold => val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) - val validation = new PartitionwiseSampledRDD(rdd, sampler, seed) - val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) + val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed) + val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed) (training, validation) }.toArray } From 357575c41f531bb178456e3b81e5c1007b74219c Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 22 Jul 2014 09:57:34 -0700 Subject: [PATCH 2/4] fix unit test --- .../org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala | 4 ++-- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala index 5dd8de319a654..a0483886f8db3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PartitionwiseSampledRDDSuite.scala @@ -43,7 +43,7 @@ class PartitionwiseSampledRDDSuite extends FunSuite with SharedSparkContext { test("seed distribution") { val rdd = sc.makeRDD(Array(1L, 2L, 3L, 4L), 2) val sampler = new MockSampler - val sample = new PartitionwiseSampledRDD[Long, Long](rdd, sampler, 0L) + val sample = new PartitionwiseSampledRDD[Long, Long](rdd, sampler, false, 0L) assert(sample.distinct().count == 2, "Seeds must be different.") } @@ -52,7 +52,7 @@ class PartitionwiseSampledRDDSuite extends FunSuite with SharedSparkContext { // We want to make sure there are no concurrency issues. val rdd = sc.parallelize(0 until 111, 10) for (sampler <- Seq(new BernoulliSampler[Int](0.5), new PoissonSampler[Int](0.5))) { - val sampled = new PartitionwiseSampledRDD[Int, Int](rdd, sampler) + val sampled = new PartitionwiseSampledRDD[Int, Int](rdd, sampler, true) sampled.zip(sampled).count() } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 53499cfb8e832..6654ec2d7c656 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -526,9 +526,9 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("sample preserves partitioner") { val partitioner = new HashPartitioner(2) val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner) - for (withReplacement <- Seq((true, false))) { + for (withReplacement <- Seq(true, false)) { val sampled = rdd.sample(withReplacement, 1.0) - assert(sampled.partitioner.get === partitiner) + assert(sampled.partitioner === rdd.partitioner) } } From 3b1ba1939236b9f512f665bb5939323c30b2dffa Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 22 Jul 2014 22:48:45 -0700 Subject: [PATCH 3/4] update doc --- .../apache/spark/rdd/PartitionwiseSampledRDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala index 3ac27f2ae4572..a637d6f15b7e5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala @@ -39,7 +39,7 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) * * @param prev RDD to be sampled * @param sampler a random sampler - * @param preservesPartitioning whether the partitioner of the parent RDD should be preserved + * @param preservesPartitioning whether the sampler preserves the partitioner of the parent RDD * @param seed random seed * @tparam T input RDD item type * @tparam U sampled RDD item type diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6e00d4e7335e8..711c1f543768e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -586,8 +586,8 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD by applying a function to each partition of this RDD. Note that - * `preservesPartitioning` means whether to preserve the partitioner of this RDD, which should be - * `false` unless this is a pair RDD and the input function doesn't modify the keys. + * `preservesPartitioning` means whether the input function preserves the partitioner, which + * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { @@ -597,8 +597,8 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index - * of the original partition. Note that `preservesPartitioning` means whether to preserve the - * partitioner of this RDD, which should be `false` unless this is a pair RDD and the input + * of the original partition. Note that `preservesPartitioning` means whether the input function + * preserves the partitioner, which should be `false` unless this is a pair RDD and the input * function doesn't modify the keys. */ def mapPartitionsWithIndex[U: ClassTag]( @@ -611,8 +611,8 @@ abstract class RDD[T: ClassTag]( * :: DeveloperApi :: * Return a new RDD by applying a function to each partition of this RDD. This is a variant of * mapPartitions that also passes the TaskContext into the closure. Note that - * `preservesPartitioning` means whether to preserve the partitioner of this RDD, which should be - * `false` unless this is a pair RDD and the input function doesn't modify the keys. + * `preservesPartitioning` means whether the input function preserves the partitioner, which + * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ @DeveloperApi def mapPartitionsWithContext[U: ClassTag]( From b361e656e4b1eec7de7acd687e23ffbaecb73470 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 22 Jul 2014 23:23:40 -0700 Subject: [PATCH 4/4] update doc based on pwendell's comments --- .../main/scala/org/apache/spark/rdd/RDD.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 711c1f543768e..c1bafab3e7491 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -585,8 +585,9 @@ abstract class RDD[T: ClassTag]( } /** - * Return a new RDD by applying a function to each partition of this RDD. Note that - * `preservesPartitioning` means whether the input function preserves the partitioner, which + * Return a new RDD by applying a function to each partition of this RDD. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitions[U: ClassTag]( @@ -597,9 +598,10 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index - * of the original partition. Note that `preservesPartitioning` means whether the input function - * preserves the partitioner, which should be `false` unless this is a pair RDD and the input - * function doesn't modify the keys. + * of the original partition. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { @@ -610,8 +612,9 @@ abstract class RDD[T: ClassTag]( /** * :: DeveloperApi :: * Return a new RDD by applying a function to each partition of this RDD. This is a variant of - * mapPartitions that also passes the TaskContext into the closure. Note that - * `preservesPartitioning` means whether the input function preserves the partitioner, which + * mapPartitions that also passes the TaskContext into the closure. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ @DeveloperApi