Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41513][SQL] Implement an accumulator to collect per mapper row count metrics #39057

Closed
wants to merge 8 commits into from

Conversation

amaliujia
Copy link
Contributor

@amaliujia amaliujia commented Dec 14, 2022

What changes were proposed in this pull request?

In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit and the data size exceeds a threshold). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead.

So we propose a row count based AQE algorithm that optimizes this problem by two folds:

  1. Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result.
  2. Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost.

Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case

The algorithm works as the following:

  1. Each mapper will record a row count when writing shuffle data.
  2. Since this is single shuffle partition case, there is only one partition but N mappers.
  3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics)
  4. AQE framework detects a plan shape of shuffle plus a global limit.
  5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers.
  6. This is both correct for limit with the sorted or non-sorted partitions.

This is the first step to implement the idea in https://issues.apache.org/jira/browse/SPARK-41512, which is to implement a row count accumulator that will be used to collect row count metrics.

Why are the changes needed?

Optimization algorithm for global limit with single partition shuffle

Does this PR introduce any user-facing change?

NO

How was this patch tested?

UT

@github-actions github-actions bot added the CORE label Dec 14, 2022
@amaliujia
Copy link
Contributor Author

@cloud-fan

*
* @since 3.4.0
*/
class MapperRowCounter extends AccumulatorV2[jl.Long, java.util.List[java.util.List[jl.Long]]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put it in the sql module, or probably the same file with shuffle node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to sql module.

@github-actions github-actions bot added SQL and removed CORE labels Dec 15, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?


def setPartitionId(id: Long): Unit = {
this.synchronized {
val p = id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I can remove this assignment and use id directly.

*
* @since 3.4.0
*/
class MapperRowCounter extends AccumulatorV2[jl.Long, java.util.List[(jl.Long, jl.Long)]] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskContext.partitionId is int, do we really need long here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The firs long here (aka IN) defines

  /**
   * Takes the inputs and accumulates.
   */
  def add(v: IN): Unit

So this should be a long for the row count?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row count should be long, map index should be int.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wrongly understand the template type.

Changed to Integer for the map index.

amaliujia and others added 3 commits December 20, 2022 13:47
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>

override def add(v: jl.Long): Unit = {
this.synchronized {
assert(!isZero, "agg must have been initialized")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(getOrCreate.size == 1)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (isZero) {
getOrCreate.add((id, 0))
} else {
val n = getOrCreate.get(0)._2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when can we hit this branch?

Copy link
Contributor Author

@amaliujia amaliujia Dec 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because I don't know the invocation sequence of the accumulator APIs on the executor side. So I added this branch for safe.

If the setPartitionId is always called before any add, then we can does an assert on isZero and remove this branch.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add an assert

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 887f831 Dec 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants