New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Abort file groups should be under same lock as committerService #7933
Conversation
Here, the committerService start to commit after all the rewrite job is finished. I am not sure whether we should enable it to commit from the rewrite job starting. Because this could increase the conflicts with the rewrite job commit request.
|
Hi, @szehon-ho @rdblue @aokolnychyi @RussellSpitzer could you help to review this when you are free? Thanks in advance. |
Ideally I think we should have a test for this edge case, is there any chance we can get one of those added in? The change here seems very safe to me |
@ConeyLiu @RussellSpitzer +1 to add a unit test to guard the logic. I think we could make the timeout threshold (120 minutes right now) configurable so that it should be a bit easy to reproduce the problem unit test. |
|
||
committerService = | ||
Executors.newSingleThreadExecutor( | ||
new ThreadFactoryBuilder().setNameFormat("Committer-Service").build()); | ||
|
||
completedRewrites = Queues.newConcurrentLinkedQueue(); | ||
committedRewrites = Lists.newArrayList(); | ||
committedRewrites = Queues.newConcurrentLinkedQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
committedRewrites
could be updated concurrently.
|
||
// Simulate the latest group of rewrite | ||
CustomCommitService spyCommitService = spy(commitService); | ||
doReturn(false).when(spyCommitService).canCreateCommitGroup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer @chenjunjiedada I have to mock the behavior. It is really difficult to simulate otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to make a CustomCommitServicde implement canCreateCommitGroup() with a wait/notify. (instead of using the rewritesPerCommit limit to trigger it as the same time as close, which is a bit hard to grasp).
Something like:
CustomCommitSerivce {
private boolean canCreateCommitGroup() {
object.wait();
}
}
...
for (int i = 0; i < 4; i++) {
service.submit(commitService.offer());
}
commitService.close();
object.notify();
I am not sure if that is what you are looking for, though, as its a bit hard for me to follow the test thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried CountDownLatch
and similar, but it is hard to trigger the problems. That's why I used the spy here.
} | ||
|
||
@Test | ||
public void testAbortFileGroupsAfterTimeout() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice UT!
.pollInSameThread() | ||
.untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue()); | ||
Assertions.assertThat(commitService.results()) | ||
.doesNotContainAnyElementsOf(commitService.aborted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides this, do we need to check which groups are committed? I see the commit sleeps 210ms, does that means all groups will be aborted or four of them will be committed successfully?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the checking. The last 3 should be aborted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a bit lost, what purpose does the timeout serve? I thought the first 4 commits are blocked by rewritesPerCommit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main thread will clean up uncommitted file groups after the timeout. Then the main thread and committerService
could both tough the completedRewrites
. And that's the root problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left minor comment.
I submit a follow-up #8001 to address the commit failed by concurrent commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good to me. Some questions/comment trying to understand the test
core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
Outdated
Show resolved
Hide resolved
// Simulate the latest group of rewrite | ||
CustomCommitService spyCommitService = spy(commitService); | ||
doReturn(false).when(spyCommitService).canCreateCommitGroup(); | ||
for (int i = 4; i < 8; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it took me awhile to get this. This is to make it go above the rewritesPerCommit limit , allowing the first batch (0-4) to finally try commit right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the problem could only be possible to trigger when there are more than rewritesPerCommit
in the queue that needs to commit by the committerService
. For example, we set the rewritesPerCommit
to 5. And there are 10 file groups that need to commit by committerService
. The timeout occurs in the first commit (range from 0 to 4). Then main thread deletes the remaining 5 file groups (5 to 9). However, the committerService
is not stopped and it still could commit the file groups (5 to 9).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, the committerService is not stopped and it still could commit the file groups (5 to 9).
Is it true? I dont see the group 5-9 will ever proceed because we always return false for canCreateCommitGroup(). (also ran the test to check). Am I missing something?
I thought race condition is between commit or aborting 0-4 group.
If that is true, how about adding these comments?
// Add file groups 0-4 for commit.
// There are less than the rewritesPerCommit, and thus commitService will not commit until next batch added.
// During commit, these wiil sleep a fixed duration, to test race condition with close abort (below)
for (int i = 0; i < 4; i++) {
commitService.offer(i);
}
// Add file groups 5-9 for commit
// These are gated to not be able to commit,
// and only serve to allow file 0-4 to proceed
CustomCommitService spyCommitService = spy(commitService);
doReturn(false).when(spyCommitService).canCreateCommitGroup();
for (int i = 4; i < 8; i++) {
spyCommitService.offer(i);
}
// close commitService.
// This will wait a fixed duration to abort file group 0-4
// testing race condition as they are attempt to finish commit (above)
Assertions.assertThatThrownBy(commitService::close)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Timeout occurred when waiting for commits");
// Wait for the commitService to finish trying to commit file groups
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.pollInSameThread()
.untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
Assertions.assertThat(commitService.results())
.doesNotContainAnyElementsOf(commitService.aborted);
Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
Can you check if comments and my understanding is right?
I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the original logic only has 8 groups in total.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho I updated the comments, please take a look.
// Add file groups [0, 4) for commit.
// There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those file groups
// will be added to the completedRewrites queue.
// Now the queue has 4 file groups that need to commit.
for (int i = 0; i < 4; i++) {
commitService.offer(i);
}
// Add file groups [4, 8) for commit
// These are gated to not be able to commit, so all those 4 file groups will be added to the queue as well.
// Now the queue has 8 file groups that need to commit.
CustomCommitService spyCommitService = spy(commitService);
doReturn(false).when(spyCommitService).canCreateCommitGroup();
for (int i = 4; i < 8; i++) {
spyCommitService.offer(i);
}
// close commitService.
// The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
// And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
// that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
// the time cost exceeding the timeout.
// Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the
// committerService thread (trying to commit file groups[6, 8))
Assertions.assertThatThrownBy(commitService::close)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Timeout occurred when waiting for commits");
// Wait for the commitService to finish. Committed all file groups or abort remaining file groups.
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.pollInSameThread()
.untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
Assertions.assertThat(commitService.results())
.doesNotContainAnyElementsOf(commitService.aborted);
Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.
[0, 4] should be committed success, however, the [5, 8] may be aborted or committed. I think we need to keep the following:
Assertions.assertThat(commitService.results())
.doesNotContainAnyElementsOf(commitService.aborted);
This indicates the committed and aborted should not be overlapped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, it makes the test easier to follow now. Now i get, that when close, there's an extra condition in the main commitService loop to commit those.
To me, this is the most confusing part.
doReturn(false).when(spyCommitService).canCreateCommitGroup();
I thought this would prevent group 4-8 from ever proceeding, but I guess its just to prevent the main thread from commiting immediately. It still gets on the committedRewrites list, and committerService thread can commit whenever the condition is triggered as its not using the spy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion to make it a bit shorter.
// close commitService.
// The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
// And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
// that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
// the time cost exceeding the timeout.
// Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the
// committerService thread (trying to commit file groups[6, 8))
=>
// close commitService.
// This allows committerService thread to starts to commit the remaining file groups [0-8] in the completedRewrites queue.
// And also the main thread waits for the committerService thread to finish within a timeout.
// The committerService thread commits file groups [0, 5]. These will wait a fixed duration to
// simulate timeout on the main thread, which then tries to abort file groups [6-8].
// This tests the race conditions, as the commiterService is also trying to commit groups [6-8].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I guess its just to prevent the main thread from commiting immediately.
Actually this is prevent the committing from the rewrite thread pool.
and committerService thread can commit whenever the condition is triggered as its not using the spy?
That's right.
|
||
// Simulate the latest group of rewrite | ||
CustomCommitService spyCommitService = spy(commitService); | ||
doReturn(false).when(spyCommitService).canCreateCommitGroup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to make a CustomCommitServicde implement canCreateCommitGroup() with a wait/notify. (instead of using the rewritesPerCommit limit to trigger it as the same time as close, which is a bit hard to grasp).
Something like:
CustomCommitSerivce {
private boolean canCreateCommitGroup() {
object.wait();
}
}
...
for (int i = 0; i < 4; i++) {
service.submit(commitService.offer());
}
commitService.close();
object.notify();
I am not sure if that is what you are looking for, though, as its a bit hard for me to follow the test thought.
.pollInSameThread() | ||
.untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue()); | ||
Assertions.assertThat(commitService.results()) | ||
.doesNotContainAnyElementsOf(commitService.aborted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a bit lost, what purpose does the timeout serve? I thought the first 4 commits are blocked by rewritesPerCommit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ConeyLiu for the explanation and making a UT.
As we seem to be unable to use any easier to use mechanism like latch, wait/notify, can we at least add some comments (added in suggestion)?
Fix is great, but did want to get to same page and add document this test before commiting, as it does look a bit complex to maintain (I spent a long time to understand it).
// Simulate the latest group of rewrite | ||
CustomCommitService spyCommitService = spy(commitService); | ||
doReturn(false).when(spyCommitService).canCreateCommitGroup(); | ||
for (int i = 4; i < 8; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, the committerService is not stopped and it still could commit the file groups (5 to 9).
Is it true? I dont see the group 5-9 will ever proceed because we always return false for canCreateCommitGroup(). (also ran the test to check). Am I missing something?
I thought race condition is between commit or aborting 0-4 group.
If that is true, how about adding these comments?
// Add file groups 0-4 for commit.
// There are less than the rewritesPerCommit, and thus commitService will not commit until next batch added.
// During commit, these wiil sleep a fixed duration, to test race condition with close abort (below)
for (int i = 0; i < 4; i++) {
commitService.offer(i);
}
// Add file groups 5-9 for commit
// These are gated to not be able to commit,
// and only serve to allow file 0-4 to proceed
CustomCommitService spyCommitService = spy(commitService);
doReturn(false).when(spyCommitService).canCreateCommitGroup();
for (int i = 4; i < 8; i++) {
spyCommitService.offer(i);
}
// close commitService.
// This will wait a fixed duration to abort file group 0-4
// testing race condition as they are attempt to finish commit (above)
Assertions.assertThatThrownBy(commitService::close)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Timeout occurred when waiting for commits");
// Wait for the commitService to finish trying to commit file groups
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.pollInSameThread()
.untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
Assertions.assertThat(commitService.results())
.doesNotContainAnyElementsOf(commitService.aborted);
Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
Can you check if comments and my understanding is right?
I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.
.isInstanceOf(IllegalArgumentException.class) | ||
.hasMessageContaining("Timeout occurred when waiting for commits"); | ||
|
||
// Wait for the committerService finish commit the remaining file groups |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo, also suggested the following comment (see my other review comment for context)
// Wait for the commitService to finish trying to commit file groups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update together
@RussellSpitzer @szehon-ho @ConeyLiu Can we add a doc like README and some flowchart to describe the commit logic? That may help later optimizations and reviews. |
Hm.. would it make more sense as a code comment rather than separate README file? @ConeyLiu can you check if my understanding is right , for the test case, and make those changes? Wanted to get this in to 1.3.1, and maybe we can do some follow up for docs |
Thanks @szehon-ho for the detailed review, will address the comments today. |
Thanks @szehon-ho @chenjunjiedada I comment in the comments thread. Please take a look. I will update to code if it is OK. |
Thanks @ConeyLiu for patiently answering my questions, the concurrent test a bit difficult to follow for me :). I'm ok to do the comment you put, I just put one more suggestion. I'm also working on another pending 1.3.1 issue so we should be ok. |
Thanks @szehon-ho for the time to do the details review. Comments have been added. |
// completedRewrites queue. And also the main thread waits for the committerService thread to | ||
// finish within a timeout. | ||
|
||
// The committerService thread commits file groups [0-4]. These will wait a fixed duration to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this is my fault, can we fix the groups? I guess the group is 0-3, and 4-7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be [0-4] because the rewritesPerCommit
is 5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry yea, long day :) Saw the 0-3 up top and thought its the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, already merged. Nice!
Thanks @szehon-ho for merging this and thanks @RussellSpitzer @chenjunjiedada @stevenzwu @yananli-ebay for reviewing. PR for 1.3.x submitted. |
* Hive: Set commit state as Unknown before throwing CommitStateUnknownException (apache#7931) (apache#8029) * Spark 3.4: WAP branch not propagated when using DELETE without WHERE (apache#7900) (apache#8028) * Core: Include all reachable snapshots with v1 format and REF snapshot mode (apache#7621) (apache#8027) * Spark 3.3: Backport 'WAP branch not propagated when using DELETE without WHERE' (apache#8033) (apache#8036) * Flink: remove the creation of default database in FlinkCatalog open method (apache#7795) (apache#8039) * Core: Handle optional fields (apache#8050) (apache#8064) * Core: Handle allow optional fields We expect: - current-snapshot-id - properties - snapshots to be there, but they are actually optional. * Use AssertJ * Core: Abort file groups should be under same lock as committerService (apache#7933) (apache#8060) * Spark 3.4: Fix rewrite_position_deletes for certain partition types (apache#8059) * Spark 3.3: Fix rewrite_position_deletes for certain partition types (apache#8059) (apache#8069) * Spark: Add actions for disater recovery. * Fix the compile error. * Fix merge conflicts and formatting * All tests are working and code integrated with Spark 3.3 * Fix union error and snapshots test * Fix Spark broadcast error * Add RewritePositionDeleteFilesSparkAction --------- Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com> Co-authored-by: Fokko Driesprong <fokko@apache.org> Co-authored-by: Xianyang Liu <liu-xianyang@hotmail.com> Co-authored-by: Szehon Ho <szehon.apache@gmail.com> Co-authored-by: Yufei Gu <yufei_gu@apple.com> Co-authored-by: yufeigu <yufei@apache.org> Co-authored-by: Laith Alzyoud <laith.alzyoud@revolut.com> Co-authored-by: vaultah <4944562+vaultah@users.noreply.github.com>
We have met a very corner case that the rewrite job was aborted due to the timeout waiting. And some of the committed files are deleted meanwhile. The problem here is that the aborting thread (main thread) and the committerService thread could operate the
completedRewrites
concurrently.The following code could not run due to the concurrent call from the rewrite thread pool.
iceberg/core/src/main/java/org/apache/iceberg/actions/BaseCommitService.java
Line 133 in 51eaf68
And then those uncommit file groups will be committed by the committerService when all the rewrite jobs are finished. So when the main thread does the abort cleanings the committerService still could submit the commit request. However, the files that need to commit could already be deleted by the abort cleaning task.
So here we lock the main thread with the same object as committerService when aborting.