Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41448] Make consistent MR job IDs in FileBatchWriter and FileFormatWriter #38980

Closed
wants to merge 1 commit into from

Conversation

boneanxs
Copy link

@boneanxs boneanxs commented Dec 8, 2022

What changes were proposed in this pull request?

Make consistent MR job IDs in FileBatchWriter and FileFormatWriter

Why are the changes needed?

SPARK-26873 fix the consistent issue for FileFormatWriter, but SPARK-33402 break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness.

Also FileBatchWriter doesn't follow this requirement, need to fix it as well.

Does this PR introduce any user-facing change?

no

How was this patch tested?

@boneanxs
Copy link
Author

boneanxs commented Dec 9, 2022

@cloud-fan @rdblue @dongjoon-hyun @steveloughran could you please take a look?

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks ok from the perspective of someone who has written some of the committers but isn't confident they understand all the nuances of spark commit protocols (i never knew that FileBatchWriter added a third way to to try and commit non-stream jobs...)

}

/**
* Create a job ID.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about extending the comment here by noting that the job id needs to be unique across all jobs (linking to SPARK-33402) and consistently across places used (SPARK-26873 + SPARK-41448). That way, whoever next goes near the code knows what is needed

@@ -201,14 +201,14 @@ object FileFormatWriter extends Logging {
rdd
}

val jobIdInstant = new Date().getTime
val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i must have missed this -but also we've not had any reports of the timstamp clash surfacing.

that's probably because the s3a and abfs/gcs committers all pick up the uuid in "spark.sql.sources.writeJobUUID" in preference to anything else, and they are being generated uniquely

@@ -29,6 +29,9 @@ import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWri
case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {

private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a uuid should be created here which is then passed down in that "spark.sql.sources.writeJobUUID" option in createTaskAttemptContext(). It would be consistent with the rest and the mapreduce manifest committer would pick it up. (so would the s3a one, but as you can't do dynamic partitioning there it's less relevant)

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@cloud-fan cloud-fan closed this in 7801666 Dec 12, 2022
cloud-fan pushed a commit that referenced this pull request Dec 12, 2022
…ormatWriter

### What changes were proposed in this pull request?
Make consistent MR job IDs in FileBatchWriter and FileFormatWriter

### Why are the changes needed?

[SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the consistent issue for FileFormatWriter, but [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness.

Also FileBatchWriter doesn't follow this requirement, need to fix it as well.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes #38980 from boneanxs/SPARK-41448.

Authored-by: Hui An <hui.an@shopee.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7801666)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Dec 12, 2022
…ormatWriter

### What changes were proposed in this pull request?
Make consistent MR job IDs in FileBatchWriter and FileFormatWriter

### Why are the changes needed?

[SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the consistent issue for FileFormatWriter, but [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness.

Also FileBatchWriter doesn't follow this requirement, need to fix it as well.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes #38980 from boneanxs/SPARK-41448.

Authored-by: Hui An <hui.an@shopee.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7801666)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/3.3./3.2

beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…ormatWriter

### What changes were proposed in this pull request?
Make consistent MR job IDs in FileBatchWriter and FileFormatWriter

### Why are the changes needed?

[SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the consistent issue for FileFormatWriter, but [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness.

Also FileBatchWriter doesn't follow this requirement, need to fix it as well.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes apache#38980 from boneanxs/SPARK-41448.

Authored-by: Hui An <hui.an@shopee.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
HyukjinKwon added a commit that referenced this pull request Dec 23, 2022
…riter.executeTask

### What changes were proposed in this pull request?

This PR is a followup of #38939 that fixes a logical conflict during merging PRs, see #38980 and #38939.

### Why are the changes needed?

To recover the broken build.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually tested:

```
 ./build/sbt -Phive clean package
```

Closes #39194 from HyukjinKwon/SPARK-41407.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@@ -29,6 +29,9 @@ import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWri
case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {

private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@boneanxs It looks like tha jobId is not serializable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does implement WritableComparable; you could use WritableConverter to do the marshalling if you don't just want to do via a string

sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
…ormatWriter

### What changes were proposed in this pull request?
Make consistent MR job IDs in FileBatchWriter and FileFormatWriter

### Why are the changes needed?

[SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the consistent issue for FileFormatWriter, but [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness.

Also FileBatchWriter doesn't follow this requirement, need to fix it as well.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes apache#38980 from boneanxs/SPARK-41448.

Authored-by: Hui An <hui.an@shopee.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 7801666)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request May 30, 2024
…inator should abort stage when committed file not consistent with task status

### What changes were proposed in this pull request?
Revert #36564 According to discuss #36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in #36564 's case, since before #38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@boneanxs boneanxs deleted the SPARK-41448 branch June 3, 2024 06:30
riyaverm-db pushed a commit to riyaverm-db/spark that referenced this pull request Jun 7, 2024
…inator should abort stage when committed file not consistent with task status

### What changes were proposed in this pull request?
Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants