Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31438][CORE][SQL] Support JobCleaned Status in SparkListener #28280

Closed
wants to merge 3 commits into from

Conversation

jackylee-ch
Copy link
Contributor

What changes were proposed in this pull request?

In Spark, we need do some hook after job cleaned, such as cleaning hive external temporary paths. This has already discussed in GitHub Pull Request #28129.
The JobEnd Status is not suitable for this. As JobEnd is responsible for Job finished, once all result has generated, it should be finished. After finish, Scheduler will leave the still running tasks to be zombie tasks and delete abnormal tasks asynchronously.
Thus, we add JobCleaned Status to enable user to do some hook after all tasks cleaned in the job and add a JobCleanedListener to do some hooks.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

run all tests

Change-Id: I44e4d41aa9f8ddf30c29058a92d94656410c021e
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@jackylee-ch
Copy link
Contributor Author

cc @rdblue @cloud-fan

// use private val and synchronized to keep thread safe
private val jobCleanedHooks = new HashMap[Int, Int => Unit]()

def addCleanedHook(jobId: Int, fun: Int => Unit): Unit = synchronized{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized {


override def onJobCleaned(jobCleaned: SparkListenerJobCleaned): Unit = {
jobCleanedHooks.get(jobCleaned.jobId)
.foreach{function =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach { function

// Make sure tmp path deleted while getting Exception before sc.runJob
deleteExternalTmpPath(hadoopConf)
throw new SparkException(
s"Failed inserting ubti table ${table.identifier.quotedString}", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ubti table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, wrong word

* JobCleanedHookListener is a basic job cleaned listener. It holds jobCleanedHooks for
* jobs and run cleaned hook after a job is cleaned.
*/
class JobCleanedHookListener extends SparkListener with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to add UT for the new class in SparkListenerSuite or somewhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am doing it.

Change-Id: Ifda55fb05cb3de8500a15e1e14ee61adf982d3ba
@jackylee-ch jackylee-ch changed the title [SPARK-31438][CORE][WIP] Support JobCleaned Status in SparkListener [SPARK-31438][CORE][SQL] Support JobCleaned Status in SparkListener May 2, 2020
@jackylee-ch
Copy link
Contributor Author

cc @rdblue @cloud-fan @xuanyuanking

Change-Id: Ibf035e0ab30ba37c3522a6d6bd4c23af841fca1d
@jackylee-ch
Copy link
Contributor Author

kindly remind @rdblue @cloud-fan @xuanyuanking.

@cloud-fan
Copy link
Contributor

we need do some hook after job cleaned, such as cleaning hive external temporary paths.

Can you give more details about the use cases? There might be better ways to solve it.

@jackylee-ch
Copy link
Contributor Author

Can you give more details about the use cases? There might be better ways to solve it.

@cloud-fan Thanks for your reply. Actually, this have been discussed in #28129. There are always some temporary directory left behind after Application Finished.

This is happened when we run InsertIntoHiveTable or InsertIntoHiveDirCommand with spark.speculation=true.
The execution environment has a slow response from the executor to driver, which causes some tasks to retry, and some tasks survive after the job ends. The surviving tasks continued writing result into temp dir, and this make temp dir uncleaned after application finished.

Here is the driver log for this.

2020-04-07 04:36:19 [dag-scheduler-event-loop]  INFO [DAGScheduler]: ResultStage 16 (sql at NativeMethodAccessorImpl.java:0) finished in 2.222 s
2020-04-07 04:36:19 [pool-3-thread-1]  INFO [DAGScheduler]: Job 2 finished: sql at NativeMethodAccessorImpl.java:0, took 23.883106 s
2020-04-07 04:36:19 [pool-3-thread-1]  INFO [FileFormatWriter]: Job null committed.
2020-04-07 04:36:19 [pool-3-thread-1]  INFO [FileFormatWriter]: Finished processing stats for job null.
2020-04-07 04:36:21 [task-result-getter-0]  WARN [TaskSetManager]: Lost task 752.0 in stage 16.0 (executor 42): TaskKilled (another attempt succeeded)
2020-04-07 04:36:21 [task-result-getter-0]  INFO [TaskSetManager]: Task 752.0 in stage 16.0 failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
2020-04-07 04:36:21 [task-result-getter-3]  WARN [TaskSetManager]: Lost task 543.0 in stage 16.0 (executor 146): TaskKilled (another attempt succeeded)
2020-04-07 04:36:21 [task-result-getter-3]  INFO [TaskSetManager]: Task 543.0 in stage 16.0 failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
2020-04-07 04:36:23 [task-result-getter-1]  WARN [TaskSetManager]: Lost task 143.0 in stage 16.0 (executor 146): TaskKilled (another attempt succeeded)
2020-04-07 04:36:23 [task-result-getter-1]  INFO [TaskSetManager]: Task 143.0 in stage 16.0 failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
2020-04-07 04:36:28 [task-result-getter-1]  INFO [YarnClusterScheduler]: Removed TaskSet 16.0, whose tasks have all completed, from pool default
2020-04-07 04:36:28 [RPC-Handler-7]  INFO [SparkUI]: Stopped Spark web UI at http://192.168.1.125:48909

@cloud-fan
Copy link
Contributor

is it possible that a task knows it fails to commit and clean up the files?

@jackylee-ch
Copy link
Contributor Author

is it possible that a task knows it fails to commit and clean up the files?

Em, I have thought about this.
This is a good way to remove extra output file and can also be used in FileFormatWriter.write. However, it will still left temporary directory uncleaned, as each task only cleaned their output dir, none of them will clean the temporary dir.

@cloud-fan
Copy link
Contributor

as each task only cleaned their output dir, none of them will clean the temporary dir.

Can you give an example about the temp directory structure?

@jackylee-ch
Copy link
Contributor Author

Can you give an example about the temp directory structure?

Here is one example for this.

.hive-staging_hive_2020-10-09_07-47-28_023_8877681090053776592-1/
└── -ext-10000
    └── _temporary
        └── 0
            ├── _temporary
            ├── task_20201009074730_0000_m_000000
            │   └── event_day=20180708
            │       └── part-00000-ffa4bddb-65a0-40a9-a94b-ba5eac475fb9.c000
            └── task_20201009074730_0000_m_000001
                └── event_day=20180708
                    └── part-00001-ffa4bddb-65a0-40a9-a94b-ba5eac475fb9.c000

@cloud-fan
Copy link
Contributor

how about the task removing parent directory if it's empty?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 21, 2021
@github-actions github-actions bot closed this Jan 22, 2021
@whcdjj
Copy link

whcdjj commented Sep 5, 2023

Is there any progress on this issue? When I was insert into hive table with speculative enabled, I encountered the same problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants