Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39622][SQL][TESTS] Add additional error message matching for SPARK-7837 Do not close output writer twice when commitTask() fail #37245

Closed
wants to merge 4 commits into from

Conversation

LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Jul 21, 2022

What changes were proposed in this pull request?

This PR adds additional assertions to fix flaky test SPARK-7837 Do not close output writer twice when commitTask() fail in ParquetIOSuite.

Why are the changes needed?

The test suite SPARK-7837 Do not close output writer twice when commitTask() fails only handle the TaskSetFailed event before SPARK-39195 due to maxTaskFailures is 1 when local mode.

But after SPARK-39195, In OutputCommitCoordinator#taskCompleted, the processing of stageState.authorizedCommitters(partition) == taskId changes from debug logging to post an StageFailed event, the flaky test may handle to one of the TaskSetFailed event and StageFailed event, and the execution order of the two events is uncertain, and there may be the following two kinds of logs:

- Scenario 1(Success)

18:47:51.592 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job 59e24bb4-d3e3-41f6-aa80-411ddb481362.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (localhost executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:596)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:334)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$12(FileFormatWriter.scala:242)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1490)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Intentional exception for testing purposes
	at scala.sys.package$.error(package.scala:30)
	at org.apache.spark.sql.execution.datasources.parquet.TaskCommitFailureParquetOutputCommitter.commitTask(ParquetIOSuite.scala:1552)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:619)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51)
	at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:279)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:619)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:318)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1524)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:324)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2706)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2641)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2641)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2897)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2836)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2825)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2222)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)

- Scenario 2(Failed)

18:49:14.145 ERROR org.apache.spark.sql.execution.datasources.FileFormatWriter: Aborting job 82a0051d-7fe4-4632-8c9c-78dfbfaf5819.
org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=1, partition=0) failed; but task commit success, data duplication may happen.
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2706)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2641)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2641)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleStageFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2894)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2836)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2825)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2222)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)

If maxTaskFailures is changed to >=2, Scenario 2 will appear stably, but the flaky test runs in the local mode and maxTaskFailures is always 1, so the this pr just adds additional assertions to workaround.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Pass GitHub Actions

@github-actions github-actions bot added the SQL label Jul 21, 2022
@LuciferYang LuciferYang changed the title [SPARK-39622][CORE][TESTS] Fix flaky test SPARK-7837 Do not close output writer twice when commitTask() fail in ParquetIOSuite [SPARK-39622][SQL][TESTS] Fix flaky test SPARK-7837 Do not close output writer twice when commitTask() fail in ParquetIOSuite Jul 21, 2022
@LuciferYang
Copy link
Contributor Author

cc @HyukjinKwon

@LuciferYang
Copy link
Contributor Author

also cc @srowen @dongjoon-hyun

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!

@LuciferYang
Copy link
Contributor Author

c97a71a merge SPARK-39831 to pass GA

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM for code wise.

For the title, the current one is proper for JIRA title, but a little insufficient for PR title. Could you revise the PR title a little more toward to the proposed solution (newly handled error message)?

@LuciferYang LuciferYang changed the title [SPARK-39622][SQL][TESTS] Fix flaky test SPARK-7837 Do not close output writer twice when commitTask() fail in ParquetIOSuite [SPARK-39622][SQL][TESTS] Add additional error message matching for SPARK-7837 Do not close output writer twice when commitTask() fail Jul 21, 2022
@LuciferYang
Copy link
Contributor Author

Change the title to

Add additional error message matching for SPARK-7837 Do not close output writer twice when commitTask() fail

@dongjoon-hyun Do you think this is OK?

@dongjoon-hyun
Copy link
Member

Thank you. Looks much better!

@LuciferYang
Copy link
Contributor Author

Run / Run TPC-DS queries with SF=1 run failed, I'll re-trigger it later

@LuciferYang LuciferYang reopened this Jul 21, 2022
@dongjoon-hyun
Copy link
Member

This test code changes add OR condition for test validations which doesn't cause any additional failure.
I'll merge this PR.

@HeartSaVioR
Copy link
Contributor

Thanks @LuciferYang for dealing with this!

@LuciferYang
Copy link
Contributor Author

Thanks all ~

HyukjinKwon pushed a commit that referenced this pull request Jul 27, 2022
…oot cause

### What changes were proposed in this pull request?
The pr follow #37245
StageFailed event should attach with the root cause

### Why are the changes needed?
**It may be a good way for users to know the reason of failure.**

By carefully investigating the issue: https://issues.apache.org/jira/browse/SPARK-39622,
I found the root cause of test failure: StageFailed don't attach the failed reason from executor.
when OutputCommitCoordinator execute 'taskCompleted', the 'reason' is ignored.

Scenario 1: receive TaskSetFailed (Success)
> InsertIntoHadoopFsRelationCommand
> FileFormatWriter.write
> _**handleTaskSetFailed**_ (**attach root cause**)
> abortStage
> failJobAndIndependentStages
> SparkListenerJobEnd

Scenario 1: receive StageFailed (Fail)
> InsertIntoHadoopFsRelationCommand
> FileFormatWriter.write
> _**handleStageFailed**_ (**don't attach root cause**)
> abortStage
> failJobAndIndependentStages
> SparkListenerJobEnd

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual run UT & Pass GitHub Actions

Closes #37292 from panbingkun/SPARK-39868.

Authored-by: panbingkun <pbk1982@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants