Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

9559 Improved asserts for latches. #9669

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -170,16 +171,16 @@ void testCompactPathToKeyValueAsync_compactionDisabled() throws IOException, Int
void testCompactionCancelled() throws IOException, InterruptedException {
CountDownLatch compactLatch = new CountDownLatch(1);
CountDownLatch testLatch = new CountDownLatch(3);
initCompactorMock(objectKeyToPath, nextBoolean(), testLatch, compactLatch);
initCompactorMock(pathToHashKeyValue, nextBoolean(), testLatch, compactLatch);
initCompactorMock(hashStoreDisk, nextBoolean(), testLatch, compactLatch);
initCompactorMock(objectKeyToPath, nextBoolean(), testLatch, compactLatch, new AtomicBoolean());
initCompactorMock(pathToHashKeyValue, nextBoolean(), testLatch, compactLatch, new AtomicBoolean());
initCompactorMock(hashStoreDisk, nextBoolean(), testLatch, compactLatch, new AtomicBoolean());

coordinator.compactDiskStoreForObjectKeyToPathAsync();
coordinator.compactDiskStoreForHashesAsync();
coordinator.compactPathToKeyValueAsync();

// let all compactions get to the latch
assertAwait(testLatch);
assertTrue(await(testLatch), "Test latch wasn't released");

// latch is released by interruption of the compaction thread
coordinator.stopAndDisableBackgroundCompaction();
Expand Down Expand Up @@ -216,12 +217,15 @@ void testCompactionWithNullNullables() throws IOException, InterruptedException

CountDownLatch compactLatch = new CountDownLatch(1);
CountDownLatch testLatch = new CountDownLatch(1);
initCompactorMock(pathToHashKeyValue, nextBoolean(), testLatch, compactLatch);
AtomicBoolean compactLatchAwaitResult = new AtomicBoolean(false);
initCompactorMock(pathToHashKeyValue, nextBoolean(), testLatch, compactLatch, compactLatchAwaitResult);

coordinator.compactPathToKeyValueAsync();

// let all compactions get to the latch
assertAwait(testLatch);
assertTrue(await(testLatch), "Test latch wasn't released");
compactLatch.countDown();
assertEventuallyTrue(compactLatchAwaitResult::get, Duration.ofSeconds(1), "Compaction latch wasn't released");

// latch is released by interruption of the compaction thread
coordinator.stopAndDisableBackgroundCompaction();
Expand Down Expand Up @@ -253,20 +257,25 @@ private void testCompaction(
throws IOException, InterruptedException {
CountDownLatch testLatch = new CountDownLatch(1);
CountDownLatch compactLatch = new CountDownLatch(1);
initCompactorMock(compactorToTest, compactionPassed, testLatch, compactLatch);
AtomicBoolean compactLatchAwaitResult = new AtomicBoolean(false);
initCompactorMock(compactorToTest, compactionPassed, testLatch, compactLatch, compactLatchAwaitResult);

// run twice to make sure that the second call is discarded because one compaction is already in progress
methodCall.run();
methodCall.run();
if (expectCompactionStarted) {
assertAwait(testLatch);
assertTrue(await(testLatch), "Test latch wasn't released");
}
compactLatch.countDown();
if (expectCompactionStarted) {
assertEventuallyTrue(
compactLatchAwaitResult::get, Duration.ofSeconds(1), "Compaction latch wasn't released");
}

assertCompactable(compactorToTest, expectCompactionStarted);

reset(objectKeyToPath, pathToHashKeyValue, hashStoreDisk, statisticsUpdater);
initCompactorMock(compactorToTest, compactionPassed, testLatch, compactLatch);
initCompactorMock(compactorToTest, compactionPassed, testLatch, compactLatch, new AtomicBoolean());

// the second time it should succeed as well
methodCall.run();
Expand All @@ -277,7 +286,7 @@ private void testCompactionFailed(DataFileCompactor compactorToTest, Runnable me
throws IOException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
when(compactorToTest.compact()).thenAnswer(invocation -> {
assertAwait(latch);
await(latch);
throw new RuntimeException("testCompactionFailed");
});

Expand All @@ -297,20 +306,21 @@ private void testCompactionFailed(DataFileCompactor compactorToTest, Runnable me
"Unexpected mock state");
}

private static void assertAwait(CountDownLatch latch) throws InterruptedException {
assertTrue(latch.await(10, TimeUnit.SECONDS), "Latch wasn't released");
private boolean await(CountDownLatch latch) throws InterruptedException {
return latch.await(500, TimeUnit.MILLISECONDS);
}

@SuppressWarnings("unchecked")
private void initCompactorMock(
DataFileCompactor compactorToTest,
boolean compactionPassed,
CountDownLatch testLatch,
CountDownLatch compactLatch)
CountDownLatch compactLatch,
AtomicBoolean awaitResult)
throws IOException, InterruptedException {
when(compactorToTest.compact()).thenAnswer(invocation -> {
testLatch.countDown();
assertAwait(compactLatch);
awaitResult.set(await(compactLatch));
return compactionPassed;
});
}
Expand Down
Loading