Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19426][SQL] Custom coalescer for Dataset #18861

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Expand Up @@ -1158,8 +1158,17 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
* Took this class out of the test suite to prevent "Task not serializable" exceptions.
*/
class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable {

def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
}

def getPartitionSize(partition: Partition): Long = {
partition.asInstanceOf[HadoopPartition].inputSplit.value.getLength
}

override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
val partitions = getPartitions(parent)
val groups = ArrayBuffer[PartitionGroup]()
var currentGroup = new PartitionGroup()
var currentSum = 0L
Expand All @@ -1168,8 +1177,8 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria

// sort partitions based on the size of the corresponding input splits
partitions.sortWith((partition1, partition2) => {
val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition1Size = getPartitionSize(partition1)
val partition2Size = getPartitionSize(partition2)
partition1Size < partition2Size
})

Expand All @@ -1187,9 +1196,7 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria

while (index < partitions.size) {
val partition = partitions(index)
val fileSplit =
partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
val splitSize = fileSplit.getLength
val splitSize = getPartitionSize(partition)
if (currentSum + splitSize < maxSize) {
addPartition(partition, splitSize)
index += 1
Expand Down
Expand Up @@ -596,14 +596,17 @@ object CollapseProject extends Rule[LogicalPlan] {
object CollapseRepartition extends Rule[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add new test cases to CollapseRepartitionSuite for the changes in this rule.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Case 1: When a Repartition has a child of Repartition or RepartitionByExpression,
// 1) When the top node does not enable the shuffle (i.e., coalesce API), but the child
// enables the shuffle. Returns the child node if the last numPartitions is bigger;
// otherwise, keep unchanged.
// 1) When the top node does not enable the shuffle (i.e., coalesce with no user-specified
// strategy), but the child enables the shuffle. Returns the child node if the last
// numPartitions is bigger; otherwise, keep unchanged.
// 2) In the other cases, returns the top node with the child's child
case r @ Repartition(_, _, child: RepartitionOperation) => (r.shuffle, child.shuffle) match {
case (false, true) => if (r.numPartitions >= child.numPartitions) child else r
case _ => r.copy(child = child.child)
}
case r @ Repartition(_, _, child: RepartitionOperation, coalescer) =>
(r.shuffle, child.shuffle) match {
case (false, true) =>
if (coalescer.isEmpty && r.numPartitions >= child.numPartitions) child else r
case _ =>
r.copy(child = child.child)
}
// Case 2: When a RepartitionByExpression has a child of Repartition or RepartitionByExpression
// we can remove the child.
case r @ RepartitionByExpression(_, child: RepartitionOperation, _) =>
Expand Down
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler
Expand Down Expand Up @@ -746,10 +746,24 @@ abstract class RepartitionOperation extends UnaryNode {
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
* of the output requires some specific ordering or distribution of the data.
*
* If `shuffle` = false (`coalesce` cases), this logical plan can have an user-specified strategy
* to coalesce input partitions.
*
* @param numPartitions How many partitions to use in the output RDD
* @param shuffle Whether to shuffle when repartitioning
* @param child the LogicalPlan
* @param coalescer Optional coalescer that an user specifies
*/
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
case class Repartition(
numPartitions: Int,
shuffle: Boolean,
child: LogicalPlan,
coalescer: Option[PartitionCoalescer] = None)
extends RepartitionOperation {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new require here?

require(!shuffle || coalescer.isEmpty, "Custom coalescer is not allowed for repartition(shuffle=true)")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

require(!shuffle || coalescer.isEmpty,
"Custom coalescer is not allowed for repartition(shuffle=true)")
}

/**
Expand Down
27 changes: 26 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PartitionCoalescer, RDD}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
Expand Down Expand Up @@ -2661,6 +2661,31 @@ class Dataset[T] private[sql](
partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions)
}

/**
* Returns a new Dataset that an user-defined `PartitionCoalescer` reduces into fewer partitions.
* `userDefinedCoalescer` is the same with a coalescer used in the `RDD` coalesce function.
*
* If a larger number of partitions is requested, it will stay at the current
* number of partitions. Similar to coalesce defined on an `RDD`, this operation results in
* a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not
* be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can call repartition. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @group typedrel
* @since 2.3.0
*/
def coalesce(
numPartitions: Int,
userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer)
}

/**
* Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
* are requested. If a larger number of partitions is requested, it will stay at the current
Expand Down
Expand Up @@ -390,11 +390,11 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
planLater(left), planLater(right)) :: Nil

case logical.Repartition(numPartitions, shuffle, child) =>
case logical.Repartition(numPartitions, shuffle, child, coalescer) =>
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
execution.CoalesceExec(numPartitions, planLater(child), coalescer) :: Nil
}
case logical.Sort(sortExprs, global, child) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
Expand Down
Expand Up @@ -20,14 +20,13 @@ package org.apache.spark.sql.execution
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.sql.types.LongType
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
Expand Down Expand Up @@ -561,7 +560,7 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions
* the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions
* is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
Expand All @@ -570,8 +569,16 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* you see ShuffleExchange. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* If you want to define how to coalesce partitions, you can set a custom strategy
* to coalesce partitions in `coalescer`.
*
* @param numPartitions Number of partitions this coalescer tries to reduce partitions into
* @param child the SparkPlan
* @param coalescer Optional coalescer that an user specifies
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
case class CoalesceExec(numPartitions: Int, child: SparkPlan, coalescer: Option[PartitionCoalescer])
extends UnaryExecNode {
override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = {
Expand All @@ -580,7 +587,7 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN
}

protected override def doExecute(): RDD[InternalRow] = {
child.execute().coalesce(numPartitions, shuffle = false)
child.execute().coalesce(numPartitions, shuffle = false, coalescer)
}
}

Expand Down
47 changes: 47 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Expand Up @@ -20,10 +20,13 @@ package org.apache.spark.sql
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import org.apache.spark.Partition
import org.apache.spark.rdd._
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -119,6 +122,39 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
data: _*)
}

test("coalesce, custom") {
withTempPath { path =>
val maxSplitSize = 512
val testData = (1 to 1000).map(i => ClassData(i.toString, i))
testData.toDS().repartition(50).write.format("csv").save(path.toString)

withSQLConf(
SQLConf.FILES_MAX_PARTITION_BYTES.key -> (maxSplitSize / 3).toString,
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0"
) {
val ds = spark.read.format("csv")
.schema("a STRING, b INT")
.load(path.toString)
.as[ClassData]

val coalescedDataSet =
ds.coalesce(4, Some(new DatasetSizeBasedPartitionCoalescer(maxSplitSize)))

assert(coalescedDataSet.rdd.partitions.length <= 50)

val expectedPartitionCount = ds.rdd.partitions.size
val totalPartitionCount = coalescedDataSet.rdd.partitions.map { p1 =>
val splitSizes = p1.asInstanceOf[CoalescedRDDPartition].parents.map { p2 =>
p2.asInstanceOf[FilePartition].files.map(_.length).sum
}
assert(splitSizes.sum <= maxSplitSize)
splitSizes.size
}.sum
assert(totalPartitionCount === expectedPartitionCount)
}
}
}

test("as tuple") {
val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
checkDataset(
Expand Down Expand Up @@ -1402,3 +1438,14 @@ case class CircularReferenceClassB(cls: CircularReferenceClassA)
case class CircularReferenceClassC(ar: Array[CircularReferenceClassC])
case class CircularReferenceClassD(map: Map[String, CircularReferenceClassE])
case class CircularReferenceClassE(id: String, list: List[CircularReferenceClassD])

class DatasetSizeBasedPartitionCoalescer(maxSize: Int) extends SizeBasedCoalescer(maxSize) {

override def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.firstParent.partitions
}

override def getPartitionSize(partition: Partition): Long = {
partition.asInstanceOf[FilePartition].files.map(x => x.length - x.start).sum
}
}
Expand Up @@ -244,7 +244,7 @@ class PlannerSuite extends SharedSQLContext {
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2)
doubleRepartitioned.queryExecution.optimizedPlan match {
case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) =>
case Repartition(numPartitions, shuffle, Repartition(_, shuffleChild, _, _), _) =>
assert(numPartitions === 5)
assert(shuffle === false)
assert(shuffleChild === true)
Expand Down