Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28699][SQL] Disable using radix sort for ShuffleExchangeExec in repartition case #25491

Closed
wants to merge 6 commits into from

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Aug 19, 2019

What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

Why are the changes needed?

Fix the correctness bug caused by repartition after a shuffle.

Does this PR introduce any user-facing change?

Yes, user will get the right result in the case of repartition stage rerun.

How was this patch tested?

Test with local-cluster[5, 2, 5120], use the integrated test below, it can return a right answer 100000000.

import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 19, 2019

cc @kiszk @dongjoon-hyun since you are the release managers. I'd like to fix this in 2.3/2.4 as it's a correctness bug.

cc @jiangxb1987 @tgravescs , I think this is the right fix, but might be too expensive as normal sort is much slower than radix sort. I'm a little worried about introducing a big perf regression to 2.3/2.4.

The new fix of the repartition bug was introduced at Spark 2.4 and works well so far. Shall we just disable the sort and always use the new fix?

@hvanhovell
Copy link
Contributor

I am not sure if I follow. Radix sort should only be selected if all sort fields can fit in the prefix, why do we need to explicitly disable this?

@@ -264,7 +264,7 @@ object ShuffleExchangeExec {
prefixComparator,
prefixComputer,
pageSize,
canUseRadixSort)
false /* canUseRadixSort */)
Copy link
Contributor

@hvanhovell hvanhovell Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the same check as used in SortExec? That would be something like:

canUseRadixSort && outputAttributes.length == 1 && SortPrefixUtils.canSortFullyWithPrefix(SortOrder(outputAttributes.head, Ascending))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are comparing binary here, so SortPrefixUtils.canSortFullyWithPrefix always return false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, I did not know that we use binary comparisons. Is this because of Map not being comparable? If it is, then that might be problematic in itself, because you expect the retried stage to return map elements in the same order right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, back to the code, the change looks good to me. It might help to add a more insightful comment here.

Copy link
Contributor

@cloud-fan cloud-fan Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because of Map not being comparable

Yes, and it's also because we don't have to do an expensive normal sort.

The only problem we want to fix here is inputs with random order. Here we just want the inputs to have a stable order, but don't really care what the order is. So comparing via the unsafe row binary format is good enough here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree a better comment would be nice here, specifically reference the jira or say don't enable because...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we compare the UnsafeRow binary here, and we the sort here must have a stable result. Otherwise, it will cause the correctness bug.
Thanks for the advice, add more comments done.

@cloud-fan
Copy link
Contributor

Radix sort should only be selected if all sort fields can fit in the prefix

Yes, but this check is missing in ShuffleExchangeExec, see
https://github.com/apache/spark/pull/25491/files#diff-3ceee31a3da1b7c71dddd32f666126fbL245

@SparkQA
Copy link

SparkQA commented Aug 19, 2019

Test build #109332 has finished for PR 25491 at commit 1256c87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

did anyone measure the performance impact?

The new fix of the repartition bug was introduced at Spark 2.4 and works well so far. Shall we just disable the sort and always use the new fix?

you mean disable #20393 ?

@cloud-fan
Copy link
Contributor

@tgravescs yea, disable spark.sql.execution.sortBeforeRepartition.

@SparkQA
Copy link

SparkQA commented Aug 19, 2019

Test build #109350 has finished for PR 25491 at commit 398a891.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Oh, thank you for pinging me, @cloud-fan . (I saw your ping now.)

@maropu maropu changed the title [SPARK-28699][Core] Disable using radix sort for ShuffleExchangeExec in repartition case [SPARK-28699][SQL] Disable using radix sort for ShuffleExchangeExec in repartition case Aug 20, 2019
@kiszk
Copy link
Member

kiszk commented Aug 20, 2019

// As we need to compare the binary of UnsafeRow here, we can't make sure whether all
// the fields can sort fully with prefix like SortExec. So we disable radix sort here
// to avoid getting unstable sort, and result to a correctness bug.
// See more details in SPARK-28699.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

we are comparing binary here, which does not support radix sort. See more details in SPARK-28699.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy, shorten it in 8bf4cc8.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @cloud-fan . Is this the only one for this topic? Or, do we have more TODO items against old branches (branch-2.4 and branch-2.3)?

Copy link
Contributor

@cloud-fan cloud-fan Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one more: #25498 , which will be merged soon, after tests pass.

The current PR may need more time. We need to collect feedback about whether we should disable "sort before repartition" by default. cc @jiangxb1987

@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109376 has finished for PR 25491 at commit ec57677.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109382 has finished for PR 25491 at commit 8bf4cc8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109389 has finished for PR 25491 at commit 8bf4cc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

I'm ok either way, I guess it comes down to the performance implications, which is why I asked if anyone had tried yet. If we disable it then it should be back to performant until something fails, which in worse case is worse since your job could fail. At least with the sort it will run just might be slower. for 2.3 and 2.4 I would lean towards leaving the sort on so that users jobs don't start failing unexpectedly and we make sure to document in release notes

@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109405 has finished for PR 25491 at commit c692265.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

image

Attach my benchmark test here. Although if we use radix sort possible can get better performance in a single data shuffle scenario, but consider the tradeoff between less usage and code complexity, just revert it in the last commit.

For the types can't use radix sort, we haven't too many choices because of the correctness bug.

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109472 has finished for PR 25491 at commit 2e26335.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member

retest this please.

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 21, 2019

The overhead looks acceptable to me (considering there is already a big overhead in "sort before repartition"). I'm +1 to merge it as it is. As @tgravescs said we may fail jobs if we turn off "sort before repartition", which can be bad for backporting.

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109481 has finished for PR 25491 at commit 2e26335.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@dongjoon-hyun
Copy link
Member

Since there is an inevitable performance degradation in this PR, I'll cc once more .
cc @gatorsmile , @tgravescs , @rxin , @srowen , @vanzin , @dbtsai , @felixcheung , @kiszk

@srowen
Copy link
Member

srowen commented Aug 21, 2019

If it's a correctness issue, I think we have to for now.

@tgravescs
Copy link
Contributor

+1, as discussed above I think performance impact is better then changing entire behavior to where it could fail.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you all.
Merged to master/2.4/2.3.

dongjoon-hyun pushed a commit that referenced this pull request Aug 21, 2019
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes #25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Aug 21, 2019
…n repartition case

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In #20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

Fix the correctness bug caused by repartition after a shuffle.

Yes, user will get the right result in the case of repartition stage rerun.

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes #25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@xuanyuanking
Copy link
Member Author

Thank you all for the discussion and guidance.
After discussing with Wenchen, while we finish the work of SPARK-25341: support rolling back a shuffle map stage and re-generate the shuffle files, we'll come back to optimize this task.
The optimization strategy is disabled sortBeforeRepartition by default. Only enable it if indeterminate stage(in this case, is the stage caused by repartition) rerun happened.
Fill Jira SPARK-28845 to track this.

@xuanyuanking xuanyuanking deleted the SPARK-28699-fix branch August 22, 2019 03:06
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 2d9cc42)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@tcondie
Copy link
Contributor

tcondie commented Nov 7, 2019

@xuanyuanking could you please clarify your comment:

            // We are comparing binary here, which does not support radix sort.
            // See more details in SPARK-28699.

When the code seems to be sorting on the hashCode of the row:

          // The comparator for comparing row hashcode, which should always be Integer.
          val prefixComparator = PrefixComparators.LONG

After rolling this fix into HDInsight Spark, we are still seeing non-deterministic behavior in round-robin shuffles. My hunch is that it may have something to do with row hashCode collisions but I have thus far not been able to produce a repro that can be shared externally i.e., this is a customer facing issue.

@cloud-fan
Copy link
Contributor

"radix sort" means we can sort by prefix (in this case hash code) directly. We must disable it here because same hash code doesn't mean same value.

I'm not sure why it's still non-deterministic in your workload, if hash code collides, spark would compare the row.

@xuanyuanking
Copy link
Member Author

@tcondie Just as Wenchen said, the scenario which can use radix sort has more conditions. You can check the logic in SortPrefixUtils.canSortFullyWithPrefix.
The non-deterministic behavior in round-robin shuffles always happened after task failing, and a simple suite to reproduce is necessary for debugging.

@tcondie
Copy link
Contributor

tcondie commented Nov 8, 2019

@xuanyuanking @cloud-fan thanks for clarifying the semantics of sort. I'm somewhat at a loss as to why we are still seeing non-deterministic behavior after this patch. I'll work on a repro that we can share publicly.

@tcondie
Copy link
Contributor

tcondie commented Nov 8, 2019

I'm looking at the RecordBinaryComparator class and have some questions regarding its compare method. The following cases raised a possible concern:

    // check if stars align and we can get both offsets to be aligned
    if ((leftOff % 8) == (rightOff % 8)) {
      while ((leftOff + i) % 8 != 0 && i < leftLen) {
        final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
        final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
        if (v1 != v2) {
          return v1 > v2 ? 1 : -1;
        }
        i += 1;
      }
    }
    // for architectures that support unaligned accesses, chew it up 8 bytes at a time
    if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) {
      while (i <= leftLen - 8) {
        final long v1 = Platform.getLong(leftObj, leftOff + i);
        final long v2 = Platform.getLong(rightObj, rightOff + i);
        if (v1 != v2) {
          return v1 > v2 ? 1 : -1;
        }
        i += 8;
      }
    }

Let's say during the first shuffle instance, the offsets happen to be aligned, and the byte comparison occurs. Further assume that v1 byte is all ones (11111111) and v2 is (01111111), then the integer comparison v1 > v2 is true. In the second instance of the shuffle (i.e., after failure / stage rerun) the offsets are not aligned (because the records entered the system in a different order) and we instead drop down to the Long comparison, which will compare signed longs making v1 negative (sign bit set) making v2 > v1 true.

This is just a theory but I thought I'd share for feedback; is this a possible code path?; should we be using Long.compareUnsigned(v1, v2) instead of v1 > v2?

@srowen
Copy link
Member

srowen commented Nov 15, 2019

@tcondie is this basically what you're saying:
#26548 (comment)

That PR looks like it's addressing a slightly different but related issue, which maybe can be addressed together.

@tcondie
Copy link
Contributor

tcondie commented Nov 16, 2019

@srowen indeed, this looks related to the situation that we may be hitting.

mccheah pushed a commit to palantir/spark that referenced this pull request Feb 4, 2020
…n repartition case

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
mccheah added a commit to palantir/spark that referenced this pull request Feb 4, 2020
…n repartition case (#640)

## What changes were proposed in this pull request?

Disable using radix sort in ShuffleExchangeExec when we do repartition.
In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning.
But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem.

### Why are the changes needed?
Fix the correctness bug caused by repartition after a shuffle.

### Does this PR introduce any user-facing change?
Yes, user will get the right result in the case of repartition stage rerun.

## How was this patch tested?

Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000.
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25491 from xuanyuanking/SPARK-28699-fix.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>

Co-authored-by: Li Yuanjian <xyliyuanjian@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.