Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33019][CORE] Use spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 by default #29895

Closed
wants to merge 1 commit into from
Closed

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Sep 28, 2020

What changes were proposed in this pull request?

Apache Spark 3.1's default Hadoop profile is hadoop-3.2. Instead of having a warning documentation, this PR aims to use a consistent and safer version of Apache Hadoop file output committer algorithm which is v1. This will prevent a silent correctness regression during migration from Apache Spark 2.4/3.0 to Apache Spark 3.1.0. Of course, if there is a user-provided configuration, spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2, that will be used still.

Why are the changes needed?

Apache Spark provides multiple distributions with Hadoop 2.7 and Hadoop 3.2. spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version depends on the Hadoop version. Apache Hadoop 3.0 switches the default algorithm from v1 to v2 and now there exists a discussion to remove v2. We had better provide a consistent default behavior of v1 across various Spark distributions.

  • MAPREDUCE-7282 MR v2 commit algorithm should be deprecated and not the default

Does this PR introduce any user-facing change?

Yes. This changes the default behavior. Users can override this conf.

How was this patch tested?

Manual.

BEFORE (spark-3.0.1-bin-hadoop3.2)

scala> sc.version
res0: String = 3.0.1

scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res1: String = 2

AFTER

scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res0: String = 1

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33808/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33808/

@SparkQA
Copy link

SparkQA commented Sep 28, 2020

Test build #129193 has finished for PR 29895 at commit 1d4d3ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 28, 2020

GitHub Action passed. The Jenkins failure is irrelevant.

BTW, cc @waleedfateem , @srowen , @HyukjinKwon , @wangyum for #29541

@dongjoon-hyun
Copy link
Member Author

cc @dbtsai , @viirya , @sunchao

@dbtsai
Copy link
Member

dbtsai commented Sep 28, 2020

also cc @steveloughran

@HyukjinKwon
Copy link
Member

Looks fine to me but how do you think @steveloughran? Looks like your call is important here.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 29, 2020

I labeled SPARK-33019 as a correctness issue because a working Apache Spark 2.4 PySpark program can generate a wrong result with Apache Spark 3.0 with Hadoop 3.2 distribution or Apache Spark 3.1 default distribution. This is a release blocker for Apache Spark 3.1. Note that this is no-op when the user provides the conf.

Comment on lines -1767 to -1773
Version 2 may have better performance, but version 1 may handle failures better in certain situations,
as per <a href="https://issues.apache.org/jira/browse/MAPREDUCE-4815">MAPREDUCE-4815</a>.
The default value depends on the Hadoop version used in an environment:
1 for Hadoop versions lower than 3.0
2 for Hadoop versions 3.0 and higher
It's important to note that this can change back to 1 again in the future once <a href="https://issues.apache.org/jira/browse/MAPREDUCE-7282">MAPREDUCE-7282</a>
is fixed and merged.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why this is deleted? It is a very comprehensive comments about the hadoop version background. @dongjoon-hyun

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR aims to provide a consistent view for Apache Spark users. For example, The default value depends on the Hadoop version used in an environment is not valid any more. After this PR, Apache Spark users will use v1 consistently by default.

@steveloughran
Copy link
Contributor

FWIW I'm going to change the default to be v1, and log @ WARN in job set up when you use v2 (unless you turn that specific log off). V2 is used in places where people have hit the scale limits with v1, and they are happy with the risk of failures. Note that if your job doesn't generate unique files with each task attempt, even without atomic task commit the output is correct. The danger is when when you get one or more of

  • different task attempts generating files with different names
  • a requirement of all output files of a task to consist entirely and exclusively of a single task attempt.

If your attempts are 100% deterministic, you are going to be safe.

@tgravescs
Copy link
Contributor

https://issues.apache.org/jira/browse/MAPREDUCE-7282 is not yet resolved so I think we should wait for resolution there. I don't remember the details off the top of my head so need to go look again.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 29, 2020

Hi, @steveloughran and @tgravescs .

No matter what happens in the future, they cannot change the history (Apache Hadoop 3.2.0 and all exiting Hadoop 3.x versions). And, for now, Apache Spark 3.1 will be stuck in Apache Hadoop 3.2.0 due to the Guava issue. That's the reason why we need to do this right now from Spark side.

For the following, @steveloughran , as I wrote in the PR description, this PR doesn't override the explicit user-given config. This is only setting v1 when there is no explicit setting.

V2 is used in places where people have hit the scale limits with v1, and they are happy with the risk of failures.

Eventually, I believe we can use hadoop-client-runtime only in order to remove guava dependency (#29843) and consume @steveloughran 's new Hadoop release in the future. Until that time, Apache Spark 3.1 had better provide a no-known-correctness-regression migration. If Apache Spark 3.1 default distribution is unsafe due to the 3rd party (in this case Hadoop), how can we recommend this to the users?

@steveloughran
Copy link
Contributor

Patch LGTM: you are changing the default algorithm to v1 if the user doesn't say otherwise.

I'm sorry about "the guava problem".. something to discuss there. It's just there were some security fixes we needed to get in and we couldn't stay on older versions. FWIW we are removing the Preconditions checks out of hadoop-common entirely and moving to our own, just to avoid grief there -but it other bits (executors, cache, ...) still be used. What a pain.Are

@tgravescs
Copy link
Contributor

I'm fine with changing the default. I was trying to figure out cases when a user would really see this.

The MapReduce paradigm and Spark rely on the output of tasks being deterministic. If they are not they have other issues with retries and the output has no guarantees. I thought Spark had deterministic output path naming but I was just starting to make sure I was remembering properly.

If those are true. I think that just leaves the _SUCCESS file thing. Which I can see if people don't check would be a problem.

Are there cases I'm missing here? Are there cases cloud providers or other tools are changing the output paths or something? @steveloughran did you see this in a particular situation?

@dongjoon-hyun
Copy link
Member Author

Thank you, @steveloughran and @tgravescs .

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 29, 2020

@tgravescs . Apache Spark's official cloud integration document is here. We are already recommending v1 for safety. With this PR, Apache Spark 3.1 (default Hadoop 3.2) can be as safe as with Apache Spark 3.0 (default Hadoop 2.7).

For object stores whose consistency model means that rename-based commits are safe use the FileOutputCommitter v2 algorithm for performance; v1 for safety.

@dongjoon-hyun
Copy link
Member Author

I'll merge this PR. Thanks!

dongjoon-hyun added a commit that referenced this pull request Sep 29, 2020
…gorithm.version=1 by default

### What changes were proposed in this pull request?

Apache Spark 3.1's default Hadoop profile is `hadoop-3.2`. Instead of having a warning documentation, this PR aims to use a consistent and safer version of Apache Hadoop file output committer algorithm which is `v1`. This will prevent a silent correctness regression during migration from Apache Spark 2.4/3.0 to Apache Spark 3.1.0. Of course, if there is a user-provided configuration, `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2`, that will be used still.

### Why are the changes needed?

Apache Spark provides multiple distributions with Hadoop 2.7 and Hadoop 3.2. `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version` depends on the Hadoop version. Apache Hadoop 3.0 switches the default algorithm from `v1` to `v2` and now there exists a discussion to remove `v2`. We had better provide a consistent default behavior of `v1` across various Spark distributions.

- [MAPREDUCE-7282](https://issues.apache.org/jira/browse/MAPREDUCE-7282) MR v2 commit algorithm should be deprecated and not the default

### Does this PR introduce _any_ user-facing change?

Yes. This changes the default behavior. Users can override this conf.

### How was this patch tested?

Manual.

**BEFORE (spark-3.0.1-bin-hadoop3.2)**
```scala
scala> sc.version
res0: String = 3.0.1

scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res1: String = 2
```

**AFTER**
```scala
scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res0: String = 1
```

Closes #29895 from dongjoon-hyun/SPARK-DEFAUT-COMMITTER.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit cc06266)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Member Author

Merged to master/3.0.

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…gorithm.version=1 by default

### What changes were proposed in this pull request?

Apache Spark 3.1's default Hadoop profile is `hadoop-3.2`. Instead of having a warning documentation, this PR aims to use a consistent and safer version of Apache Hadoop file output committer algorithm which is `v1`. This will prevent a silent correctness regression during migration from Apache Spark 2.4/3.0 to Apache Spark 3.1.0. Of course, if there is a user-provided configuration, `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2`, that will be used still.

### Why are the changes needed?

Apache Spark provides multiple distributions with Hadoop 2.7 and Hadoop 3.2. `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version` depends on the Hadoop version. Apache Hadoop 3.0 switches the default algorithm from `v1` to `v2` and now there exists a discussion to remove `v2`. We had better provide a consistent default behavior of `v1` across various Spark distributions.

- [MAPREDUCE-7282](https://issues.apache.org/jira/browse/MAPREDUCE-7282) MR v2 commit algorithm should be deprecated and not the default

### Does this PR introduce _any_ user-facing change?

Yes. This changes the default behavior. Users can override this conf.

### How was this patch tested?

Manual.

**BEFORE (spark-3.0.1-bin-hadoop3.2)**
```scala
scala> sc.version
res0: String = 3.0.1

scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res1: String = 2
```

**AFTER**
```scala
scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res0: String = 1
```

Closes apache#29895 from dongjoon-hyun/SPARK-DEFAUT-COMMITTER.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit cc06266)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants