Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42942][SQL] Support coalesce table cache stage partitions #40574

Closed

Conversation

ulysses-you
Copy link
Contributor

What changes were proposed in this pull request?

Add a new rule CoalesceCachePartitions to support coalesce partitions with TableCacheQueryStageExec. In order to reuse the code path with CoalesceShufflePartitions, this pr also does a small refactor about how we coalesce partitions.

RDD cache use the RDD id and partition id as the block id, so it seems not possible to split skewd partitions like shuffle. To reduce complexity, this pr does not allow coalesce partitions with both shuffle and cache stage since shuffle read may contain skewed partition spec.

For example, the follow case can not be coalesced by both CoalesceCachePartitions and CoalesceShufflePartitions.

SMJ
  ShuffleQueryStage
  TableCacheStage

Why are the changes needed?

Make AQE support coalesce table cache stage partitions.

Does this PR introduce any user-facing change?

yes, add a new config to control if coalesce partitions for table cache stage.

How was this patch tested?

add tests

@github-actions github-actions bot added the SQL label Mar 28, 2023
@ulysses-you
Copy link
Contributor Author

cc @dongjoon-hyun @cloud-fan @viirya @yaooqinn thank you

/**
* The [[Partition]] used by [[CachedRDD]].
*/
case class CachedRDDPartition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some PR comments to indicate which code are copied from somewhere and which code are new?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most are from CoalescedRDD,

private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
partitionCoalescer: Option[PartitionCoalescer] = None)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies

that has similar use case to coalesce RDD partition. The main difference is CachedRDD know its target partitions which make things simple. All core methods in CachedRDD can reuse the origin of previous RDD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if that's the case, is there a reason why we can not implement this with CoalescedRDD and supplying the correct partitionCoalescer: Option[PartitionCoalescer]? It feels a little strange having an RDD classes code copied from core to SQL.

Copy link
Contributor

@jaceklaskowski jaceklaskowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is so much to review...just skimmed over the PR.

// `RoundRobinPartitioning` but we don't need to retain the number of partitions.
case r: RoundRobinPartitioning =>
r.copy(numPartitions = numPartitions)
case other@SinglePartition =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spaces around @?

r.copy(numPartitions = numPartitions)
case other@SinglePartition =>
throw new IllegalStateException(
"Unexpected partitioning for coalesced shuffle read: " + other)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/Unexpected/Illegal ?

case _ =>
// Spark plugins may have custom partitioning and may replace this operator
// during the postStageOptimization phase, so return UnknownPartitioning here
// rather than throw an exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this comment a DEBUG message?

Comment on lines +25 to +27
abstract class AQERead extends UnaryExecNode {
def child: SparkPlan
def partitionSpecs: Seq[ShufflePartitionSpec]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a comment about what AQERead is/does?

Comment on lines +48 to +52
if (specsMap.nonEmpty) {
updateCacheReads(plan, specsMap)
} else {
plan
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this only applies to reads?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 17, 2023
@github-actions github-actions bot closed this Oct 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants