New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2033] Automatically cleanup checkpoint #855
Conversation
Why would you want to cleanup checkpoint data automatically since it is an explicit user action ? I can see the need for adding the need to register to remove a checkpoint when it can be safely gc'ed - but note that this need not happen (since gc need not finish when called before app exit) which is same as this PR. |
Yes, i agree with @mridulm This should not be done automatically for all checkpoints, as it is necessary to keep them around across spark invocations. For example, with Spark Streaming saves intermediate state datasets as checkpoints and relies on them to recover from driver failures. |
QA tests have started for PR 855. This patch merges cleanly. |
QA results for PR 855: |
QA tests have started for PR 855. This patch merges cleanly. |
QA results for PR 855: |
QA tests have started for PR 855. This patch merges cleanly. |
QA results for PR 855: |
This is definitely better. Can you make add the documentation for this property in the configuration page? |
test("automatically cleanup checkpoint data") { | ||
val conf=new SparkConf().setMaster("local[2]").setAppName("cleanupCheckpointData"). | ||
set("spark.cleaner.checkpointData.enabled","true") | ||
sc =new SparkContext(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
space missing
QA tests have started for PR 855. This patch merges cleanly. |
This definitely is much better, thanks for the PR ! |
QA results for PR 855: |
@tdas this has gone stale - can you take a look? |
f367358
to
9cdbdaa
Compare
Test build #22601 has finished for PR 855 at commit
|
@@ -32,6 +32,7 @@ private sealed trait CleanupTask | |||
private case class CleanRDD(rddId: Int) extends CleanupTask | |||
private case class CleanShuffle(shuffleId: Int) extends CleanupTask | |||
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask | |||
private case class CleanRDDCheckpointData(rddId: Int) extends CleanupTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just call this CleanCheckpoint
here and other places
@andrewor14 Yes, this is a good feature to have. |
9cdbdaa
to
4c555d3
Compare
@andrewor14 @tdas The code has been updated. |
Test build #27975 has finished for PR 855 at commit
|
Test build #27976 has finished for PR 855 at commit
|
46016d3
to
6a630f0
Compare
Test build #28430 has finished for PR 855 at commit
|
@@ -139,6 +140,11 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { | |||
registerForCleanup(broadcast, CleanBroadcast(broadcast.id)) | |||
} | |||
|
|||
/** Register a RDDCheckpointData for cleanup when it is garbage collected. */ | |||
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add Unit
return type here and other places
6a630f0
to
1649850
Compare
Test build #29998 has finished for PR 855 at commit
|
Merging this into master thanks @witgo |
…between them (#855) ### What changes were proposed in this pull request? This pr makes `CombineUnions` combine unions if there is a project between them. For example: ```scala spark.range(1).selectExpr("CAST(id AS decimal(18, 1)) AS id").write.saveAsTable("t1") spark.range(2).selectExpr("CAST(id AS decimal(18, 2)) AS id").write.saveAsTable("t2") spark.range(3).selectExpr("CAST(id AS decimal(18, 3)) AS id").write.saveAsTable("t3") spark.range(4).selectExpr("CAST(id AS decimal(18, 4)) AS id").write.saveAsTable("t4") spark.range(5).selectExpr("CAST(id AS decimal(18, 5)) AS id").write.saveAsTable("t5") spark.sql("SELECT id FROM t1 UNION SELECT id FROM t2 UNION SELECT id FROM t3 UNION SELECT id FROM t4 UNION SELECT id FROM t5").explain(true) ``` Before this pr: ``` == Optimized Logical Plan == Aggregate [id#36], [id#36] +- Union false, false :- Aggregate [id#34], [cast(id#34 as decimal(22,5)) AS id#36] : +- Union false, false : :- Aggregate [id#32], [cast(id#32 as decimal(21,4)) AS id#34] : : +- Union false, false : : :- Aggregate [id#30], [cast(id#30 as decimal(20,3)) AS id#32] : : : +- Union false, false : : : :- Project [cast(id#25 as decimal(19,2)) AS id#30] : : : : +- Relation default.t1[id#25] parquet : : : +- Project [cast(id#26 as decimal(19,2)) AS id#31] : : : +- Relation default.t2[id#26] parquet : : +- Project [cast(id#27 as decimal(20,3)) AS id#33] : : +- Relation default.t3[id#27] parquet : +- Project [cast(id#28 as decimal(21,4)) AS id#35] : +- Relation default.t4[id#28] parquet +- Project [cast(id#29 as decimal(22,5)) AS id#37] +- Relation default.t5[id#29] parquet ``` After this pr: ``` == Optimized Logical Plan == Aggregate [id#36], [id#36] +- Union false, false :- Project [cast(id#25 as decimal(22,5)) AS id#36] : +- Relation default.t1[id#25] parquet :- Project [cast(id#26 as decimal(22,5)) AS id#46] : +- Relation default.t2[id#26] parquet :- Project [cast(id#27 as decimal(22,5)) AS id#45] : +- Relation default.t3[id#27] parquet :- Project [cast(id#28 as decimal(22,5)) AS id#44] : +- Relation default.t4[id#28] parquet +- Project [cast(id#29 as decimal(22,5)) AS id#37] +- Relation default.t5[id#29] parquet ``` ### Why are the changes needed? Improve query performance by reduce shuffles. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #35214 from wangyum/SPARK-37915. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit ac2b0df) * [SPARK-37915][SQL] Combine unions if there is a project between them
No description provided.