Skip to content

Comments

[WIP][SPARK-37487][SQL][CORE] Avoid performing CollectMetrics twice if the operation is followed by global sort.#34765

Closed
sarutak wants to merge 1 commit intoapache:masterfrom
sarutak:SPARK-37487
Closed

[WIP][SPARK-37487][SQL][CORE] Avoid performing CollectMetrics twice if the operation is followed by global sort.#34765
sarutak wants to merge 1 commit intoapache:masterfrom
sarutak:SPARK-37487

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Dec 1, 2021

What changes were proposed in this pull request?

This PR fixes an issue that CollectMetrics performs twice if it's followed by global sort like as follows.

val df = spark.range(100)
  .observe(
    name = "my_event",
    min($"id").as("min_val"),
    max($"id").as("max_val"),
    sum($"id"),
    count(when($"id" % 2 === 0, 1)).as("num_even"))
  .sort($"id".desc)

The expected statistics calculated by CollectMetrics is [0,99,4950,50] but the actual result is [0,99,9900,100].
The reason is that jobs for sampling can run before the global sort, which performs extra CollectMetrics.

val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()

The solution this PR proposes to introduce a property spark.job.isSamplingJob which is intended to be get/set internally.
Before the sampling jobs run, Spark sets the property, and reset it after the jobs finish.
Then, CollectMetrics can judge a task is whether of a sampling job or not.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New test.

@sarutak sarutak changed the title [SPARK-37487][SQL][CORE] Avoid performing CollectMetrics twice if the operation is folloed by global sort. [SPARK-37487][SQL][CORE] Avoid performing CollectMetrics twice if the operation is followed by global sort. Dec 1, 2021
@tanelk
Copy link
Contributor

tanelk commented Dec 1, 2021

Does this bug also impact the metrics reported by other nodes? For example numOutputRows in FilterExec.

@sarutak
Copy link
Member Author

sarutak commented Dec 1, 2021

Does this bug also impact the metrics reported by other nodes? For example numOutputRows in FilterExec.

Seems so. We need a more comprehensive solution...

@sarutak sarutak changed the title [SPARK-37487][SQL][CORE] Avoid performing CollectMetrics twice if the operation is followed by global sort. [WIP][SPARK-37487][SQL][CORE] Avoid performing CollectMetrics twice if the operation is followed by global sort. Dec 1, 2021
@sarutak sarutak marked this pull request as draft December 1, 2021 07:08
@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50269/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50269/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Test build #145798 has finished for PR 34765 at commit 891fa19.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 12, 2022
@github-actions github-actions bot closed this Mar 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants