Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1] Shuffle+Repartition on a DataFrame could lead to incorrect answers #22211

Closed
wants to merge 2 commits into from

Conversation

henryr
Copy link
Contributor

@henryr henryr commented Aug 23, 2018

What changes were proposed in this pull request?

Back port of #20393 and #22079.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

…1] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

Back port of apache#20393 and apache#22079.

Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
```

In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

Add unit test in ExchangeSuite.

With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
```
import scala.sys.process._

import org.apache.spark.TaskContext

spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()

res7: Long = 1000000
```

Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

## How was this patch tested?

Ran all SBT unit tests for org.apache.spark.sql.*.

Ran pyspark tests for module pyspark-sql.

Closes apache#22079 from bersprockets/SPARK-23207.

Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: Bruce Robbins <bersprockets@gmail.com>
Co-authored-by: Zheng RuiFeng <ruifengz@foxmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
@henryr henryr changed the title [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.… [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1] Shuffle+Repartition on a DataFrame could lead to incorrect answers Aug 23, 2018
@SparkQA
Copy link

SparkQA commented Aug 23, 2018

Test build #95186 has finished for PR 22211 at commit 5bdd9e3.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public final class RecordBinaryComparator extends RecordComparator

@SparkQA
Copy link

SparkQA commented Aug 24, 2018

Test build #95189 has finished for PR 22211 at commit d269015.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @jiangxb1987

@jiangxb1987
Copy link
Contributor

LGTM

asfgit pushed a commit that referenced this pull request Aug 27, 2018
…1] Shuffle+Repartition on a DataFrame could lead to incorrect answers

## What changes were proposed in this pull request?

    Back port of #20393 and #22079.

    Currently shuffle repartition uses RoundRobinPartitioning, the generated result is nondeterministic since the sequence of input rows are not determined.

    The bug can be triggered when there is a repartition call following a shuffle (which would lead to non-deterministic row ordering), as the pattern shows below:
    upstream stage -> repartition stage -> result stage
    (-> indicate a shuffle)
    When one of the executors process goes down, some tasks on the repartition stage will be retried and generate inconsistent ordering, and some tasks of the result stage will be retried generating different data.

    The following code returns 931532, instead of 1000000:
    ```
    import scala.sys.process._

    import org.apache.spark.TaskContext
    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()
    ```

    In this PR, we propose a most straight-forward way to fix this problem by performing a local sort before partitioning, after we make the input row ordering deterministic, the function from rows to partitions is fully deterministic too.

    The downside of the approach is that with extra local sort inserted, the performance of repartition() will go down, so we add a new config named `spark.sql.execution.sortBeforeRepartition` to control whether this patch is applied. The patch is default enabled to be safe-by-default, but user may choose to manually turn it off to avoid performance regression.

    This patch also changes the output rows ordering of repartition(), that leads to a bunch of test cases failure because they are comparing the results directly.

    Add unit test in ExchangeSuite.

    With this patch(and `spark.sql.execution.sortBeforeRepartition` set to true), the following query returns 1000000:
    ```
    import scala.sys.process._

    import org.apache.spark.TaskContext

    spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")

    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()

    res7: Long = 1000000
    ```

    Author: Xingbo Jiang <xingbo.jiangdatabricks.com>

Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Author: Henry Robinson <henry@apache.org>

Closes #22211 from henryr/spark-23207-branch-2.1.
@gatorsmile
Copy link
Member

Thanks! Merged to 2.1

@dongjoon-hyun
Copy link
Member

Hi, @henryr . Since this is merged, could you close this PR?

@henryr
Copy link
Contributor Author

henryr commented Sep 11, 2018

Merged to 2.1

@henryr henryr closed this Sep 11, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants