Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9899] [SQL] Fixes HadoopFsRelation speculative writes when used together with direct output committer #8191

Closed

Conversation

liancheng
Copy link
Contributor

Hadoop output format classes call FileSystem.create() to create output files, and set overwrite argument to false. This causes trouble for speculative write tasks when a direct output committer is used, since previous tasks may leave partial output files there.

This PR tries to fix this issue by removing the output file when generating output file paths. This is equivalent to creating output files with overwrite flag set to true.

@liancheng
Copy link
Contributor Author

cc @marmbrus

@SparkQA
Copy link

SparkQA commented Aug 14, 2015

Test build #40854 has finished for PR 8191 at commit a318645.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

I have some concerns about the way this is implemented in the case of speculation. What happens in the following sequence of events.

  • Writer 1 starts
  • Speculative writer 2 deletes writer 1s file
  • Writer 1 finishes
  • Writer 2 aborts

Is there no file there now?

@liancheng
Copy link
Contributor Author

@marmbrus However, in this case, the job should be aborted since both the original task and the speculative task fail, thus we shouldn't assume written data files are intact.

@marmbrus
Copy link
Contributor

I meant it gets aborted because the first one completed.
On Aug 14, 2015 5:35 PM, "Cheng Lian" notifications@github.com wrote:

@marmbrus https://github.com/marmbrus However, in this case, the job
should be aborted since both the original task and the speculative task
fail, thus we shouldn't assume written data files are intact.


Reply to this email directly or view it on GitHub
#8191 (comment).

@liancheng
Copy link
Contributor Author

liancheng commented Aug 16, 2015

@marmbrus Had some offline discussions with @yhuai, and we believe that the real problem we hit in the job behind SPARK-9899 is SPARK-10005 (fixed by #8228).

Several known facts:

  1. The failed job we observed is a CTAS job
  2. First attempt of a task fails because of SPARK-10005
  3. Successive attempts of a tasks fail because of the "File already exists" error
  4. When using S3, FileSystem first writes files to local disk and uploads them when writers are closed
  5. We are using direct output committer, so different attempts of a task writes to exactly the same output path

We suspect things probably happen in the following order:

  1. CTAS job gets translated into a InsertIntoHadoopFsRelation physical plan, a write job is issued
  2. The 1st attempt of a write task T1 is issued, it aims output path P, but firstly opens a local temporary file F
  3. T1 tries to read data from existing Parquet files and write rows to F
  4. T1 fails due to SPARK-10005 and aborts
  5. Output writer used by T1 is closed in abortTask, F gets closed
  6. F gets uploaded to S3 as an empty/corrupted file, now P is created
  7. Write task T2 is issued as the 2nd attempt of T1
  8. T2 tries to create the output file, but hit "File already exists" because of existing S3 file P uploaded by T1

The hard part is that, we can't ...

  1. ... delete the target file at the beginning of a task (as what this PR does)

Because the very case you mentioned.
2. ... use FileSystem.create(path, true) to overwrite the output file

Since it's equivalent to 1.
3. ... delete the output file in abortTask() when a task fails

Because failed speculative tasks may delete properly committed output file written by other successful attempt(s)

What makes it worse, consider the following case:

  1. Write task T1 gets issued, and executes slowly
  2. Task T2 gets issued as speculative task of T1
  3. T1 succeeds, output file gets uploaded to S3 in commitTask()
  4. T2 failed, partially written temporary output file gets uploaded to S3 in abortTask()

Now correct data is overwritten by corrupted data.

The TL;DR is that, S3 + direct output committer + speculation is a REALLY bad combination. Currently I don't see a solution to fix all the cases mentioned above. My suggestion is to deprecate this combination by checking whether speculation is enabled when a direct output committer is used.

@liancheng
Copy link
Contributor Author

Closing this one since it's not solving the real issue and introduces more problems.

@liancheng liancheng closed this Aug 17, 2015
@liancheng liancheng deleted the spark-9899/speculative-write branch August 17, 2015 09:44
asfgit pushed a commit that referenced this pull request Aug 19, 2015
…ion is on

Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss.

Please see this [PR comment] [1] for more details.

[1]: #8191 (comment)

Author: Cheng Lian <lian@databricks.com>

Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
asfgit pushed a commit that referenced this pull request Aug 19, 2015
…ion is on

Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss.

Please see this [PR comment] [1] for more details.

[1]: #8191 (comment)

Author: Cheng Lian <lian@databricks.com>

Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.

(cherry picked from commit f3ff4c4)
Signed-off-by: Michael Armbrust <michael@databricks.com>

Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
asfgit pushed a commit that referenced this pull request Sep 14, 2015
…lation enabled

This is a follow-up of #8317.

When speculation is enabled, there may be multiply tasks writing to the same path. Generally it's OK as we will write to a temporary directory first and only one task can commit the temporary directory to target path.

However, when we use direct output committer, tasks will write data to target path directly without temporary directory. This causes problems like corrupted data. Please see [PR comment](#8191 (comment)) for more details.

Unfortunately, we don't have a simple flag to tell if a output committer will write to temporary directory or not, so for safety, we have to disable any customized output committer when `speculation` is true.

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #8687 from cloud-fan/direct-committer.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
…ion is on

Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss.

Please see this [PR comment] [1] for more details.

[1]: apache/spark#8191 (comment)

Author: Cheng Lian <lian@databricks.com>

Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
…lation enabled

This is a follow-up of apache/spark#8317.

When speculation is enabled, there may be multiply tasks writing to the same path. Generally it's OK as we will write to a temporary directory first and only one task can commit the temporary directory to target path.

However, when we use direct output committer, tasks will write data to target path directly without temporary directory. This causes problems like corrupted data. Please see [PR comment](apache/spark#8191 (comment)) for more details.

Unfortunately, we don't have a simple flag to tell if a output committer will write to temporary directory or not, so for safety, we have to disable any customized output committer when `speculation` is true.

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #8687 from cloud-fan/direct-committer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants