Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19426][SQL] Custom coalescer for Dataset #18861

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 18 additions & 11 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Expand Up @@ -1158,8 +1158,17 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
* Took this class out of the test suite to prevent "Task not serializable" exceptions.
*/
class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable {

def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
}

def getPartitionSize(partition: Partition): Long = {
partition.asInstanceOf[HadoopPartition].inputSplit.value.getLength
}

override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = {
val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions
val partitions = getPartitions(parent)
val groups = ArrayBuffer[PartitionGroup]()
var currentGroup = new PartitionGroup()
var currentSum = 0L
Expand All @@ -1168,8 +1177,8 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria

// sort partitions based on the size of the corresponding input splits
partitions.sortWith((partition1, partition2) => {
val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength
val partition1Size = getPartitionSize(partition1)
val partition2Size = getPartitionSize(partition2)
partition1Size < partition2Size
})

Expand All @@ -1185,23 +1194,21 @@ class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Seria
totalSum += splitSize
}

while (index < partitions.size) {
while (index < partitions.length) {
val partition = partitions(index)
val fileSplit =
partition.asInstanceOf[HadoopPartition].inputSplit.value.asInstanceOf[FileSplit]
val splitSize = fileSplit.getLength
val splitSize = getPartitionSize(partition)
if (currentSum + splitSize < maxSize) {
addPartition(partition, splitSize)
index += 1
if (index == partitions.size) {
updateGroups
if (index == partitions.length) {
updateGroups()
}
} else {
if (currentGroup.partitions.size == 0) {
if (currentGroup.partitions.isEmpty) {
addPartition(partition, splitSize)
index += 1
} else {
updateGroups
updateGroups()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the above changes are not related to this PR, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I just left the changes of the original author (probably refactoring stuffs?) ..., better remove this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine about this, but it might confuse the others. Maybe just remove them in this PR? You can submit a separate PR later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll drop these from this pr.

}
}
}
Expand Down
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.rdd.PartitionCoalescer
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.RandomSampler
Expand Down Expand Up @@ -752,6 +752,16 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new require here?

require(!shuffle || coalescer.isEmpty, "Custom coalescer is not allowed for repartition(shuffle=true)")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

/**
* Returns a new RDD that has at most `numPartitions` partitions. This behavior can be modified by
* supplying a `PartitionCoalescer` to control the behavior of the partitioning.
*/
case class PartitionCoalesce(numPartitions: Int, coalescer: PartitionCoalescer, child: LogicalPlan)
extends UnaryNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding new logical nodes also needs the updates in multiple different components. (e.g., Optimizer).

Is that possible to reuse the existing node Repartition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I think so. I'll try and plz give me days to do so.

require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
override def output: Seq[Attribute] = child.output
}

/**
* This method repartitions data using [[Expression]]s into `numPartitions`, and receives
* information about the number of partitions during execution. Used when a specific ordering or
Expand Down
31 changes: 30 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PartitionCoalescer, RDD}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogRelation
Expand Down Expand Up @@ -2661,6 +2661,35 @@ class Dataset[T] private[sql](
partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions)
}

/**
* Returns a new Dataset that an user-defined `PartitionCoalescer` reduces into fewer partitions.
* `userDefinedCoalescer` is the same with a coalescer used in the `RDD` coalesce function.
*
* If a larger number of partitions is requested, it will stay at the current
* number of partitions. Similar to coalesce defined on an `RDD`, this operation results in
* a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not
* be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can call repartition. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @group typedrel
* @since 2.3.0
*/
def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = {
userDefinedCoalescer.map { coalescer =>
withTypedPlan {
PartitionCoalesce(numPartitions, coalescer, logicalPlan)
}
}.getOrElse {
coalesce(numPartitions)
}
}

/**
* Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
* are requested. If a larger number of partitions is requested, it will stay at the current
Expand Down
Expand Up @@ -394,8 +394,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
execution.CoalesceExec(numPartitions, planLater(child), None) :: Nil
}
case logical.PartitionCoalesce(numPartitions, coalescer, child) =>
execution.CoalesceExec(numPartitions, planLater(child), Some(coalescer)) :: Nil
case logical.Sort(sortExprs, global, child) =>
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
Expand Down
Expand Up @@ -20,14 +20,13 @@ package org.apache.spark.sql.execution
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
import org.apache.spark.{InterruptibleIterator, TaskContext}
import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.sql.types.LongType
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
Expand Down Expand Up @@ -571,7 +570,8 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
case class CoalesceExec(numPartitions: Int, child: SparkPlan, coalescer: Option[PartitionCoalescer])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the parm description of coalescer? also update function descriptions? Thanks~!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!

extends UnaryExecNode {
override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = {
Expand All @@ -580,7 +580,7 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN
}

protected override def doExecute(): RDD[InternalRow] = {
child.execute().coalesce(numPartitions, shuffle = false)
child.execute().coalesce(numPartitions, shuffle = false, coalescer)
}
}

Expand Down
47 changes: 47 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Expand Up @@ -20,10 +20,13 @@ package org.apache.spark.sql
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import org.apache.spark.Partition
import org.apache.spark.rdd._
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -119,6 +122,39 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
data: _*)
}

test("coalesce, custom") {
withTempPath { path =>
val maxSplitSize = 512
val testData = (1 to 1000).map(i => ClassData(i.toString, i))
testData.toDS().repartition(50).write.format("csv").save(path.toString)

withSQLConf(
SQLConf.FILES_MAX_PARTITION_BYTES.key -> (maxSplitSize / 3).toString,
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "0"
) {
val ds = spark.read.format("csv")
.schema("a STRING, b INT")
.load(path.toString)
.as[ClassData]

val coalescedDataSet =
ds.coalesce(4, Some(new DatasetSizeBasedPartitionCoalescer(maxSplitSize)))

assert(coalescedDataSet.rdd.partitions.length <= 50)

val expectedPartitionCount = ds.rdd.partitions.size
val totalPartitionCount = coalescedDataSet.rdd.partitions.map { p1 =>
val splitSizes = p1.asInstanceOf[CoalescedRDDPartition].parents.map { p2 =>
p2.asInstanceOf[FilePartition].files.map(_.length).sum
}
assert(splitSizes.sum <= maxSplitSize)
splitSizes.size
}.sum
assert(totalPartitionCount === expectedPartitionCount)
}
}
}

test("as tuple") {
val data = Seq(("a", 1), ("b", 2)).toDF("a", "b")
checkDataset(
Expand Down Expand Up @@ -1402,3 +1438,14 @@ case class CircularReferenceClassB(cls: CircularReferenceClassA)
case class CircularReferenceClassC(ar: Array[CircularReferenceClassC])
case class CircularReferenceClassD(map: Map[String, CircularReferenceClassE])
case class CircularReferenceClassE(id: String, list: List[CircularReferenceClassD])

class DatasetSizeBasedPartitionCoalescer(maxSize: Int) extends SizeBasedCoalescer(maxSize) {

override def getPartitions(parent: RDD[_]): Array[Partition] = {
parent.firstParent.partitions
}

override def getPartitionSize(partition: Partition): Long = {
partition.asInstanceOf[FilePartition].files.map(x => x.length - x.start).sum
}
}