-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark : Support parallelism in RemoveOrphanFiles #3872
Conversation
@szehon-ho and @RussellSpitzer , can you help kicked of the CI workflow for me as first time contributor? Feel free to take a look and I can do more local testing in the meantime. Thank you! |
Thanks, will take a look! I'll have to let others as I dont have permission |
Approved to run |
Updated Spark 2.4/3.0/3.1 Actions due to interface change in api, somehow it was not reflected in IDE initially as those spark version was not added as src. Anyway I did run the check and test for each spark version and they are all passing, and also rebased with latest upstream. Hopefully CI would be happy now |
Sorry, I think I did some rebase from upstream and fi, ends up re-require the approval in order to run the CI. But now this PR it's ready for review and I was wondering if @RussellSpitzer can help take a look? Add @szehon-ho as well for issue #3582 |
Sorry I've only been able to approve the ci, busy Monday and Tuesday! |
No worries, please take your time and I am happy to see all checks are passing |
...4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
Show resolved
Hide resolved
...4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
Show resolved
Hide resolved
|
||
@Test | ||
public void testConcurrentRemoveOrphanFilesWithInvalidInput() { | ||
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually add these tests to testInvalid***Cases()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we did it this way in ExpireSnapshots too... I guess it's fine to do it that way here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, Small request though
Can we just put the changes for Spark 3.2 in this pr then the backports in another pr?
Thanks Russel, that's my original intention as well. But since I am change the interface in API, change the spark 3.2 alone but not other Spark versions will break the CI workflow. Or do you want me to separate these change out in 2 separate commits but in same PR? Maybe I understand you wrong, If you can elaborate a bit more in detail, I am happy to do it! |
Ah I totally forgot, I think it's fine then. It just makes it harder for me to check all the changes. I'll merge after we fix up the few spacing nits. |
@RussellSpitzer, I added newline as suggested by you in baseAction for spark 2.4/3.0/3.1/3.2 and confirmed that they are all the same before and after my change, I hope that helps! Also looks like CI still need your approval to run for the spacing change
|
Yep! Clicked it off, you will have full auth to run any CI after we merge this so you won't need me anymore :D |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me
@@ -86,6 +87,8 @@ | |||
} | |||
}, DataTypes.StringType); | |||
|
|||
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I see a huge point to this variable (feel like a comment could have sufficed), but I guess it's there in the ExpireSnapshot version, so no matter :)
I guess the explicit assumption is that Tasks if executing without a service, executes the task on a single thread.
Thanks @dramaticlly for the PR and @szehon-ho for the review! Merged! |
Notes
Allow parallelism in removing orphan files using Spark procedures and Spark Actions, like how it was implemented in expire snapshots, This change close #3582
Since I need to change the
DeleteOrphanFiles
interface in api, ends up also updating the code for spark2.4/3.0/3.1/3.2 BaseDeleteOrphanFilesSparkAction as they are implement the same interface, also update the spark procedures to allow calling from spark SQL.Tests
I update the tests for
TestRemoveOrphanFilesAction
andTestRemoveOrphanFilesProcedure
in respective spark version. I did not touch any tests inTestRemoveOrphanFilesAction3
as it mostly cover different catalogs