Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48292][CORE] Revert [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status #46696

Closed
wants to merge 2 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented May 22, 2024

What changes were proposed in this pull request?

Revert #36564 According to discuss #36564 (comment)

When spark commit task will commit to committedTaskPath
${outputpath}/_temporary//${appAttempId}/${taskId}
So in #36564 's case, since before #38980, each task's job id's date is not the same, when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

Why are the changes needed?

No need anymore

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existed UT

Was this patch authored or co-authored using generative AI tooling?

No

…nator should abort stage when committed file not consistent with task status
@AngersZhuuuu
Copy link
Contributor Author

ping @cloud-fan

@github-actions github-actions bot added the CORE label May 22, 2024
// Regression test for SPARK-10381
val e = intercept[SparkException] {
failAfter(Span(60, Seconds)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we still check the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't throw error after revert...., it can run success.

@cloud-fan
Copy link
Contributor

Can we explain this a bit more about why the issue is gone now?
image

@AngersZhuuuu
Copy link
Contributor Author

Can we explain this a bit more about why the issue is gone now? image

Added to pr desc

@cloud-fan
Copy link
Contributor

will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Will we hit file already exist exception in this case?

@AngersZhuuuu
Copy link
Contributor Author

will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Will we hit file already exist exception in this case?

commitTask will overwrite the existed committedTaskPath , won't throw file already exception.
截屏2024-05-24 10 19 10

@cloud-fan
Copy link
Contributor

can we also revert #46562 in this PR?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Also, +1 to include the additional revert into here.

cc @viirya

@viirya
Copy link
Member

viirya commented May 24, 2024

Looks good to me.

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-48292[CORE] Revert [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [SPARK-48292][CORE] Revert [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status May 29, 2024
@AngersZhuuuu
Copy link
Contributor Author

can we also revert #46562 in this PR?

Done

@github-actions github-actions bot added the SQL label May 29, 2024
@AngersZhuuuu
Copy link
Contributor Author

GA passed cc @cloud-fan

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in f68d761 May 30, 2024
riyaverm-db pushed a commit to riyaverm-db/spark that referenced this pull request Jun 7, 2024
…inator should abort stage when committed file not consistent with task status

### What changes were proposed in this pull request?
Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@dongjoon-hyun
Copy link
Member

Hi, @AngersZhuuuu , @viirya , @cloud-fan .
This seems to cause confusions due to the behavior difference consistently at Apache Spark 3.4 and 3.5. Can we have this backport in old release branches?

@viirya
Copy link
Member

viirya commented Jul 1, 2024

Hi, @AngersZhuuuu , @viirya , @cloud-fan . This seems to cause confusions due to the behavior difference consistently at Apache Spark 3.4 and 3.5. Can we have this backport in old release branches?

Sounds reasonable to me. Looks like #38980 is also merged into 3.4.

dongjoon-hyun pushed a commit to dongjoon-hyun/spark that referenced this pull request Jul 1, 2024
…inator should abort stage when committed file not consistent with task status

Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

No need anymore

No

Existed UT

No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit to dongjoon-hyun/spark that referenced this pull request Jul 1, 2024
…inator should abort stage when committed file not consistent with task status

Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

No need anymore

No

Existed UT

No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
dongjoon-hyun pushed a commit that referenced this pull request Jul 1, 2024
…Coordinator should abort stage when committed file not consistent with task status

This is a backport of #46696

### What changes were proposed in this pull request?
Revert #36564 According to discuss #36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in #36564 's case, since before #38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47166 from dongjoon-hyun/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Jul 1, 2024
…Coordinator should abort stage when committed file not consistent with task status

This is a backport of #46696

### What changes were proposed in this pull request?
Revert #36564 According to discuss #36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in #36564 's case, since before #38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After #38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47168 from dongjoon-hyun/SPARK-48292-3.4.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@akki
Copy link

akki commented Aug 2, 2024

Hi all

I am facing this issue after upgrading to Spark3.5.1 and wonder if this revert would help me. Does anyone here know that?

Many of our jobs are failing with "Authorized committer" errors and we might have to revert our whole system back to Spark3.3, which would be a lot of work. I am wondering if patching my Spark (to include this commit) would make these failures go away. I would appreciate if anyone who closely understands this diff could confirm (or deny) to understanding.

Thanks!

@cloud-fan
Copy link
Contributor

This revert should fix your problem.

@dongjoon-hyun
Copy link
Member

To @akki , as mentioned by Wenchen, SPARK-48292 fixed it by reverting old patch.

Please try to download and test your case with Apache Spark 3.5.2 RC4.

@akki
Copy link

akki commented Aug 4, 2024

Thanks for the reply both. I'll try applying this patch.

I don't want to audit all the changes included in the 3.5.2RC4 at the moment, so I am leaning towards just reverting the earlier commit for now.

Appreciate the quick responses!

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
…Coordinator should abort stage when committed file not consistent with task status

This is a backport of apache#46696

### What changes were proposed in this pull request?
Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47168 from dongjoon-hyun/SPARK-48292-3.4.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@lihao712
Copy link

will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Will we hit file already exist exception in this case?

commitTask will overwrite the existed committedTaskPath , won't throw file already exception. 截屏2024-05-24 10 19 10

As far as I know, after Hadoop 2.7, the algorithm for checking the task commit path is version 2, and the version 1 implementation performs poorly in practice. For tasks with a large number of files, under the algorithm version 2, how can we ensure that there won't be two task commit files for the same partition existing simultaneously in the final directory?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Sep 19, 2024

To @lihao712 , Apache Spark (3.0.2+) uses version 1 by default (via SPARK-33019) due to the correctness issue of version 1 (MAPREDUCE-7282).

As far as I know, after Hadoop 2.7, the algorithm for checking the task commit path is version 2, and the version 1 implementation performs poorly in practice. For tasks with a large number of files, under the algorithm version 2, how can we ensure that there won't be two task commit files for the same partition existing simultaneously in the final directory?

@lihao712
Copy link

To @lihao712 , Apache Spark (3.0.2+) uses version 1 by default (via SPARK-33019) due to the correctness issue of version 1 (MAPREDUCE-7282).

As far as I know, after Hadoop 2.7, the algorithm for checking the task commit path is version 2, and the version 1 implementation performs poorly in practice. For tasks with a large number of files, under the algorithm version 2, how can we ensure that there won't be two task commit files for the same partition existing simultaneously in the final directory?

However, the performance of Algorithm 1 is significantly worse than that of Algorithm 2. Have you tested the performance of both algorithms in scenarios where there are a large number of files produced in the partition? Additionally, has Hadoop made any optimizations to improve the performance of Algorithm 1?

@mridulm
Copy link
Contributor

mridulm commented Sep 20, 2024

@lihao712, as @dongjoon-hyun mentioned above - v1 is used given correctness issue of v2.
Correctness takes precedence over performance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants