Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Abort file groups should be under same lock as committerService #7933

Merged
merged 5 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
Expand All @@ -49,13 +50,16 @@
abstract class BaseCommitService<T> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseCommitService.class);

public static final long TIMEOUT_IN_MS_DEFAULT = TimeUnit.MINUTES.toMillis(120);

private final Table table;
private final ExecutorService committerService;
private final ConcurrentLinkedQueue<T> completedRewrites;
private final ConcurrentLinkedQueue<String> inProgressCommits;
private final List<T> committedRewrites;
private final ConcurrentLinkedQueue<T> committedRewrites;
private final int rewritesPerCommit;
private final AtomicBoolean running = new AtomicBoolean(false);
private final long timeoutInMS;

/**
* Constructs a {@link BaseCommitService}
Expand All @@ -64,17 +68,30 @@ abstract class BaseCommitService<T> implements Closeable {
* @param rewritesPerCommit number of file groups to include in a commit
*/
BaseCommitService(Table table, int rewritesPerCommit) {
this(table, rewritesPerCommit, TIMEOUT_IN_MS_DEFAULT);
}

/**
* Constructs a {@link BaseCommitService}
*
* @param table table to perform commit on
* @param rewritesPerCommit number of file groups to include in a commit
* @param timeoutInMS The timeout to wait for commits to complete after all rewrite jobs have been
* completed
*/
BaseCommitService(Table table, int rewritesPerCommit, long timeoutInMS) {
this.table = table;
LOG.info(
"Creating commit service for table {} with {} groups per commit", table, rewritesPerCommit);
this.rewritesPerCommit = rewritesPerCommit;
this.timeoutInMS = timeoutInMS;

committerService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("Committer-Service").build());

completedRewrites = Queues.newConcurrentLinkedQueue();
committedRewrites = Lists.newArrayList();
committedRewrites = Queues.newConcurrentLinkedQueue();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

committedRewrites could be updated concurrently.

inProgressCommits = Queues.newConcurrentLinkedQueue();
}

Expand Down Expand Up @@ -138,7 +155,7 @@ public List<T> results() {
Preconditions.checkState(
committerService.isShutdown(),
"Cannot get results from a service which has not been closed");
return committedRewrites;
return Lists.newArrayList(committedRewrites.iterator());
}

@Override
Expand All @@ -154,11 +171,13 @@ public void close() {
// the commit pool to finish doing its commits to Iceberg State. In the case of partial
// progress this should have been occurring simultaneously with rewrites, if not there should
// be only a single commit operation.
if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
if (!committerService.awaitTermination(timeoutInMS, TimeUnit.MILLISECONDS)) {
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn(
"Commit operation did not complete within 120 minutes of the all files "
"Commit operation did not complete within {} minutes ({} ms) of the all files "
+ "being rewritten. This may mean that some changes were not successfully committed to the "
+ "table.");
+ "table.",
TimeUnit.MILLISECONDS.toMinutes(timeoutInMS),
timeoutInMS);
timeout = true;
}
} catch (InterruptedException e) {
Expand All @@ -169,7 +188,11 @@ public void close() {

if (!completedRewrites.isEmpty() && timeout) {
LOG.error("Attempting to cleanup uncommitted file groups");
completedRewrites.forEach(this::abortFileGroup);
synchronized (completedRewrites) {
while (!completedRewrites.isEmpty()) {
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved
abortFileGroup(completedRewrites.poll());
}
}
}

Preconditions.checkArgument(
Expand Down Expand Up @@ -211,11 +234,17 @@ private void commitReadyCommitGroups() {
}
}

private boolean canCreateCommitGroup() {
@VisibleForTesting
boolean canCreateCommitGroup() {
// Either we have a full commit group, or we have completed writing and need to commit
// what is left over
boolean fullCommitGroup = completedRewrites.size() >= rewritesPerCommit;
boolean writingComplete = !running.get() && completedRewrites.size() > 0;
return fullCommitGroup || writingComplete;
}

@VisibleForTesting
boolean completedRewritesAllCommitted() {
return completedRewrites.isEmpty();
}
}
112 changes: 112 additions & 0 deletions core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Test;

public class TestCommitService extends TableTestBase {

public TestCommitService() {
super(1);
}

@Test
public void testCommittedResultsCorrectly() {
CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
commitService.start();

ExecutorService executorService = Executors.newFixedThreadPool(10);
Tasks.range(100).executeWith(executorService).run(commitService::offer);
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved
commitService.close();

Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
Set<Integer> actual = Sets.newHashSet(commitService.results());
Assertions.assertThat(actual).isEqualTo(expected);
}

@Test
public void testAbortFileGroupsAfterTimeout() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice UT!

CustomCommitService commitService = new CustomCommitService(table, 5, 200);
commitService.start();

// Add the number of less than rewritesPerCommit
for (int i = 0; i < 4; i++) {
commitService.offer(i);
}

// Simulate the latest group of rewrite
CustomCommitService spyCommitService = spy(commitService);
doReturn(false).when(spyCommitService).canCreateCommitGroup();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer @chenjunjiedada I have to mock the behavior. It is really difficult to simulate otherwise.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

Copy link
Collaborator

@szehon-ho szehon-ho Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make a CustomCommitServicde implement canCreateCommitGroup() with a wait/notify. (instead of using the rewritesPerCommit limit to trigger it as the same time as close, which is a bit hard to grasp).

Something like:

CustomCommitSerivce {
  private boolean canCreateCommitGroup() {
     object.wait();
  }
}
...
for (int i = 0; i < 4; i++) {
   service.submit(commitService.offer());
}
commitService.close();
object.notify();

I am not sure if that is what you are looking for, though, as its a bit hard for me to follow the test thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried CountDownLatch and similar, but it is hard to trigger the problems. That's why I used the spy here.

for (int i = 4; i < 8; i++) {
Copy link
Collaborator

@szehon-ho szehon-ho Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it took me awhile to get this. This is to make it go above the rewritesPerCommit limit , allowing the first batch (0-4) to finally try commit right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem could only be possible to trigger when there are more than rewritesPerCommit in the queue that needs to commit by the committerService. For example, we set the rewritesPerCommit to 5. And there are 10 file groups that need to commit by committerService. The timeout occurs in the first commit (range from 0 to 4). Then main thread deletes the remaining 5 file groups (5 to 9). However, the committerService is not stopped and it still could commit the file groups (5 to 9).

Copy link
Collaborator

@szehon-ho szehon-ho Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, the committerService is not stopped and it still could commit the file groups (5 to 9).

Is it true? I dont see the group 5-9 will ever proceed because we always return false for canCreateCommitGroup(). (also ran the test to check). Am I missing something?

I thought race condition is between commit or aborting 0-4 group.

If that is true, how about adding these comments?

// Add file groups 0-4 for commit.
// There are less than the rewritesPerCommit, and thus commitService will not commit until next batch added.
// During commit, these wiil sleep a fixed duration, to test race condition with close abort (below)
for (int i = 0; i < 4; i++) {
    commitService.offer(i);
 }

// Add file groups 5-9 for commit
// These are gated to not be able to commit, 
// and only serve to allow file 0-4 to proceed
CustomCommitService spyCommitService = spy(commitService);
doReturn(false).when(spyCommitService).canCreateCommitGroup();
for (int i = 4; i < 8; i++) {
   spyCommitService.offer(i);
}

// close commitService.  
// This will wait a fixed duration to abort file group 0-4
// testing race condition as they are attempt to finish commit (above)
Assertions.assertThatThrownBy(commitService::close)
        .isInstanceOf(IllegalArgumentException.class)
        .hasMessageContaining("Timeout occurred when waiting for commits");

// Wait for the commitService to finish trying to commit file groups
Awaitility.await()
    .atMost(5, TimeUnit.SECONDS)
        .pollInSameThread()
        .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());


Assertions.assertThat(commitService.results())
      .doesNotContainAnyElementsOf(commitService.aborted);

Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));

Can you check if comments and my understanding is right?

I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the original logic only has 8 groups in total.

Copy link
Contributor Author

@ConeyLiu ConeyLiu Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho I updated the comments, please take a look.

// Add file groups [0, 4) for commit.
// There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those file groups
// will be added to the completedRewrites queue.
// Now the queue has 4 file groups that need to commit.
for (int i = 0; i < 4; i++) {
    commitService.offer(i);
 }

// Add file groups [4, 8) for commit
// These are gated to not be able to commit, so all those 4 file groups will be added to the queue as well.
// Now the queue has 8 file groups that need to commit.
CustomCommitService spyCommitService = spy(commitService);
doReturn(false).when(spyCommitService).canCreateCommitGroup();
for (int i = 4; i < 8; i++) {
   spyCommitService.offer(i);
}

// close commitService. 
// The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
// And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
// that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
// the time cost exceeding the timeout. 
// Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the 
// committerService thread (trying to commit file groups[6, 8))
Assertions.assertThatThrownBy(commitService::close)
        .isInstanceOf(IllegalArgumentException.class)
        .hasMessageContaining("Timeout occurred when waiting for commits");

// Wait for the commitService to finish. Committed all file groups or abort remaining file groups. 
Awaitility.await()
    .atMost(5, TimeUnit.SECONDS)
        .pollInSameThread()
        .untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());


Assertions.assertThat(commitService.results())
      .doesNotContainAnyElementsOf(commitService.aborted);

Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));

I also feel the commitService results will not always have 0-4 then? But rather we should change the check to if they are either aborted or committed.

[0, 4] should be committed success, however, the [5, 8] may be aborted or committed. I think we need to keep the following:

Assertions.assertThat(commitService.results())
      .doesNotContainAnyElementsOf(commitService.aborted);

This indicates the committed and aborted should not be overlapped.

Copy link
Collaborator

@szehon-ho szehon-ho Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it makes the test easier to follow now. Now i get, that when close, there's an extra condition in the main commitService loop to commit those.

To me, this is the most confusing part.

doReturn(false).when(spyCommitService).canCreateCommitGroup();

I thought this would prevent group 4-8 from ever proceeding, but I guess its just to prevent the main thread from commiting immediately. It still gets on the committedRewrites list, and committerService thread can commit whenever the condition is triggered as its not using the spy?

Copy link
Collaborator

@szehon-ho szehon-ho Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion to make it a bit shorter.

// close commitService. 
// The committerService thread now starts to commit those remaining file groups in the completedRewrites queue.
// And also the main thread waits for the committerService thread to finish within a timeout. There are 8 file groups
// that need to commit. The first committing will commit file groups [0, 5]. This will wait a fixed duration to simulate
// the time cost exceeding the timeout. 
// Then testing race conditions between the main thread (trying to abort remaining file groups[6, 8)), and the 
// committerService thread (trying to commit file groups[6, 8))

=>

// close commitService. 
// This allows committerService thread to starts to commit the remaining file groups [0-8] in the completedRewrites queue.
// And also the main thread waits for the committerService thread to finish within a timeout. 

// The committerService thread commits file groups [0, 5].  These will wait a fixed duration to 
// simulate timeout on the main thread, which then tries to abort file groups [6-8].
// This tests the race conditions, as the commiterService is also trying to commit groups [6-8].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I guess its just to prevent the main thread from commiting immediately.

Actually this is prevent the committing from the rewrite thread pool.

and committerService thread can commit whenever the condition is triggered as its not using the spy?

That's right.

spyCommitService.offer(i);
}

Assertions.assertThatThrownBy(commitService::close)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Timeout occurred when waiting for commits");

// Wait for the committerService finish commit the remaining file groups
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo, also suggested the following comment (see my other review comment for context)

// Wait for the commitService to finish trying to commit file groups

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update together

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.pollInSameThread()
.untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
Assertions.assertThat(commitService.results())
.doesNotContainAnyElementsOf(commitService.aborted);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides this, do we need to check which groups are committed? I see the commit sleeps 210ms, does that means all groups will be aborted or four of them will be committed successfully?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the checking. The last 3 should be aborted.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a bit lost, what purpose does the timeout serve? I thought the first 4 commits are blocked by rewritesPerCommit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thread will clean up uncommitted file groups after the timeout. Then the main thread and committerService could both tough the completedRewrites. And that's the root problem.

Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
}

private static class CustomCommitService extends BaseCommitService<Integer> {
private final Set<Integer> aborted = Sets.newConcurrentHashSet();

CustomCommitService(Table table, int rewritesPerCommit, int timeoutInSeconds) {
super(table, rewritesPerCommit, timeoutInSeconds);
}

@Override
protected void commitOrClean(Set<Integer> batch) {
try {
// Slightly longer than timeout
Thread.sleep(210);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
protected void abortFileGroup(Integer group) {
aborted.add(group);
}
}
}