Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -597,7 +596,7 @@ public synchronized void add(OMClientResponse response, TermIndex termIndex) {
*/
private synchronized boolean canFlush() {
try {
while (currentBuffer.size() == 0) {
while (currentBuffer.isEmpty()) {
// canFlush() only gets called when the readyBuffer is empty.
// Since both buffers are empty, notify once for each.
flushNotifier.notifyFlush();
Expand Down Expand Up @@ -649,29 +648,58 @@ void resume() {
isRunning.set(true);
}

CompletableFuture<Integer> awaitFlushAsync() {
return flushNotifier.await();
}

public void awaitFlush() throws InterruptedException {
flushNotifier.await();
try {
awaitFlushAsync().get();
} catch (ExecutionException e) {
// the future will never be completed exceptionally.
throw new IllegalStateException(e);
}
}

static class FlushNotifier {
private final Set<CountDownLatch> flushLatches =
ConcurrentHashMap.newKeySet();
static class Entry {
private final CompletableFuture<Integer> future = new CompletableFuture<>();
private int count;

void await() throws InterruptedException {
private CompletableFuture<Integer> await() {
count++;
return future;
}

private int complete() {
Preconditions.checkState(future.complete(count));
return future.join();
}
}

// Wait until both the current and ready buffers are flushed.
CountDownLatch latch = new CountDownLatch(2);
flushLatches.add(latch);
latch.await();
flushLatches.remove(latch);
/** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/
private final Map<Integer, Entry> flushFutures = new TreeMap<>();
private int awaitCount;
private int flushCount;

synchronized CompletableFuture<Integer> await() {
awaitCount++;
final int flush = flushCount + 2;
LOG.debug("await flush {}", flush);
final Entry entry = flushFutures.computeIfAbsent(flush, key -> new Entry());
Preconditions.checkState(flushFutures.size() <= 2);
return entry.await();
}

int notifyFlush() {
int retval = flushLatches.size();
for (CountDownLatch l : flushLatches) {
l.countDown();
synchronized int notifyFlush() {
final int await = awaitCount;
final int flush = ++flushCount;
final Entry removed = flushFutures.remove(flush);
if (removed != null) {
awaitCount -= removed.complete();
}
return retval;
LOG.debug("notifyFlush {}, awaitCount: {} -> {}", flush, await, awaitCount);
return await;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ public void testOzoneManagerDoubleBuffer(
}

@Test
public void testAwaitFlush()
throws ExecutionException, InterruptedException {
public void testAwaitFlush() throws Exception {
List<OMClientResponse> omClientResponses =
Arrays.asList(omKeyCreateResponse,
omBucketCreateResponse);
Expand All @@ -286,9 +285,9 @@ public void testAwaitFlush()
notifyCounter.incrementAndGet();
assertEquals(0, doubleBuffer.getCurrentBufferSize());
assertEquals(0, doubleBuffer.getReadyBufferSize());
flushNotifier.notifyFlush();
return null;
return flushNotifier.notifyFlush();
}).when(spyFlushNotifier).notifyFlush();
doAnswer(i -> flushNotifier.await()).when(spyFlushNotifier).await();

// Init double buffer.
for (OMClientResponse omClientResponse : omClientResponses) {
Expand All @@ -298,7 +297,7 @@ public void testAwaitFlush()
doubleBuffer.getCurrentBufferSize());

// Start double buffer and wait for flush.
Future<?> await = awaitFlush(executorService);
final Future<?> await = awaitFlush();
Future<Boolean> flusher = flushTransactions(executorService);
await.get();

Expand All @@ -311,8 +310,7 @@ public void testAwaitFlush()
assertEquals(0, doubleBuffer.getReadyBufferSize());

// Run again to make sure it works when double buffer is empty
await = awaitFlush(executorService);
await.get();
awaitFlush().get();

// Clean up.
flusher.cancel(false);
Expand Down Expand Up @@ -404,11 +402,8 @@ private OzoneManagerProtocolProtos.OMRequest s3GetSecretRequest(
}

// Return a future that waits for the flush.
private Future<Boolean> awaitFlush(ExecutorService executorService) {
return executorService.submit(() -> {
doubleBuffer.awaitFlush();
return true;
});
private Future<?> awaitFlush() {
return doubleBuffer.awaitFlushAsync();
}

private Future<Boolean> flushTransactions(ExecutorService executorService) {
Expand All @@ -432,12 +427,11 @@ public void testFlushNotifier()

// Confirm nothing waiting yet.
assertEquals(0, fn.notifyFlush());
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<Boolean>> tasks = new ArrayList<>();

// Simulate 3 waiting.
for (int i = 0; i < 3; i++) {
tasks.add(waitFN(fn, executorService));
tasks.add(waitFN(fn));
}
Thread.sleep(2000);

Expand All @@ -448,7 +442,7 @@ public void testFlushNotifier()
assertEquals(3, fn.notifyFlush());

// Add a fourth.
tasks.add(waitFN(fn, executorService));
tasks.add(waitFN(fn));
Thread.sleep(2000);
assertEquals(4, fn.notifyFlush());

Expand All @@ -466,15 +460,7 @@ public void testFlushNotifier()

}

// Have a thread wait until notified.
private Future<Boolean> waitFN(OzoneManagerDoubleBuffer.FlushNotifier fn,
ExecutorService executorService) {
return executorService.submit(() -> {
try {
fn.await();
} catch (InterruptedException e) {
}
return true;
});
private static Future<Boolean> waitFN(OzoneManagerDoubleBuffer.FlushNotifier fn) {
return fn.await().thenApply(n -> true);
}
}