From 71c990fb7ffbc03066d4e99865abdf92acf1018b Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 1 Jan 2024 13:28:13 -0800 Subject: [PATCH 1/2] HDDS-10039. Remove the flushLatches set from FlushNotifier. --- .../om/ratis/OzoneManagerDoubleBuffer.java | 66 +++++++++++++------ .../ratis/TestOzoneManagerDoubleBuffer.java | 36 ++++------ 2 files changed, 58 insertions(+), 44 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 1b4e3cafc47d..0717313556a3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -28,12 +28,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; -import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -597,7 +597,7 @@ public synchronized void add(OMClientResponse response, TermIndex termIndex) { */ private synchronized boolean canFlush() { try { - while (currentBuffer.size() == 0) { + while (currentBuffer.isEmpty()) { // canFlush() only gets called when the readyBuffer is empty. // Since both buffers are empty, notify once for each. flushNotifier.notifyFlush(); @@ -649,29 +649,57 @@ void resume() { isRunning.set(true); } + CompletableFuture awaitFlushAsync() { + return flushNotifier.await(); + } + public void awaitFlush() throws InterruptedException { - flushNotifier.await(); + try { + awaitFlushAsync().get(); + } catch (ExecutionException e) { + // the future will never be completed exceptionally. + throw new IllegalStateException(e); + } } static class FlushNotifier { - private final Set flushLatches = - ConcurrentHashMap.newKeySet(); + static class Entry { + private final CompletableFuture future = new CompletableFuture<>(); + private int count; - void await() throws InterruptedException { + private CompletableFuture await() { + count++; + return future; + } - // Wait until both the current and ready buffers are flushed. - CountDownLatch latch = new CountDownLatch(2); - flushLatches.add(latch); - latch.await(); - flushLatches.remove(latch); + private int complete() { + Preconditions.checkState(future.complete(count)); + return future.join(); + } } - int notifyFlush() { - int retval = flushLatches.size(); - for (CountDownLatch l : flushLatches) { - l.countDown(); - } - return retval; + /** The size of the map is at most two since it uses {@link #flushCount} + 2 in {@link #await()} .*/ + private final Map flushFutures = new TreeMap<>(); + private int awaitCount; + private int flushCount; + + synchronized CompletableFuture await() { + awaitCount++; + final int flush = flushCount + 2; + LOG.debug("await flush {}", flush); + final Entry entry = flushFutures.computeIfAbsent(flush, key -> new Entry()); + Preconditions.checkState(flushFutures.size() <= 2); + return entry.await(); + } + + synchronized int notifyFlush() { + final int await = awaitCount; + final int flush = ++flushCount; + awaitCount -= Optional.ofNullable(flushFutures.remove(flush)) + .map(Entry::complete) + .orElse(0); + LOG.debug("notifyFlush {}, awaitCount: {} -> {}", flush, await, awaitCount); + return await; } } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java index dba210986014..d1a7049ffb41 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java @@ -264,8 +264,7 @@ public void testOzoneManagerDoubleBuffer( } @Test - public void testAwaitFlush() - throws ExecutionException, InterruptedException { + public void testAwaitFlush() throws Exception { List omClientResponses = Arrays.asList(omKeyCreateResponse, omBucketCreateResponse); @@ -286,9 +285,9 @@ public void testAwaitFlush() notifyCounter.incrementAndGet(); assertEquals(0, doubleBuffer.getCurrentBufferSize()); assertEquals(0, doubleBuffer.getReadyBufferSize()); - flushNotifier.notifyFlush(); - return null; + return flushNotifier.notifyFlush(); }).when(spyFlushNotifier).notifyFlush(); + doAnswer(i -> flushNotifier.await()).when(spyFlushNotifier).await(); // Init double buffer. for (OMClientResponse omClientResponse : omClientResponses) { @@ -298,7 +297,7 @@ public void testAwaitFlush() doubleBuffer.getCurrentBufferSize()); // Start double buffer and wait for flush. - Future await = awaitFlush(executorService); + final Future await = awaitFlush(); Future flusher = flushTransactions(executorService); await.get(); @@ -311,8 +310,7 @@ public void testAwaitFlush() assertEquals(0, doubleBuffer.getReadyBufferSize()); // Run again to make sure it works when double buffer is empty - await = awaitFlush(executorService); - await.get(); + awaitFlush().get(); // Clean up. flusher.cancel(false); @@ -404,11 +402,8 @@ private OzoneManagerProtocolProtos.OMRequest s3GetSecretRequest( } // Return a future that waits for the flush. - private Future awaitFlush(ExecutorService executorService) { - return executorService.submit(() -> { - doubleBuffer.awaitFlush(); - return true; - }); + private Future awaitFlush() { + return doubleBuffer.awaitFlushAsync(); } private Future flushTransactions(ExecutorService executorService) { @@ -432,12 +427,11 @@ public void testFlushNotifier() // Confirm nothing waiting yet. assertEquals(0, fn.notifyFlush()); - ExecutorService executorService = Executors.newCachedThreadPool(); List> tasks = new ArrayList<>(); // Simulate 3 waiting. for (int i = 0; i < 3; i++) { - tasks.add(waitFN(fn, executorService)); + tasks.add(waitFN(fn)); } Thread.sleep(2000); @@ -448,7 +442,7 @@ public void testFlushNotifier() assertEquals(3, fn.notifyFlush()); // Add a fourth. - tasks.add(waitFN(fn, executorService)); + tasks.add(waitFN(fn)); Thread.sleep(2000); assertEquals(4, fn.notifyFlush()); @@ -466,15 +460,7 @@ public void testFlushNotifier() } - // Have a thread wait until notified. - private Future waitFN(OzoneManagerDoubleBuffer.FlushNotifier fn, - ExecutorService executorService) { - return executorService.submit(() -> { - try { - fn.await(); - } catch (InterruptedException e) { - } - return true; - }); + private static Future waitFN(OzoneManagerDoubleBuffer.FlushNotifier fn) { + return fn.await().thenApply(n -> true); } } From 0dd36b4315c47aad08b167d36a13eac64782977f Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Tue, 9 Jan 2024 09:33:08 -0800 Subject: [PATCH 2/2] Do not use Optional. --- .../hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 0717313556a3..96bad99e87e1 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -28,7 +28,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Queue; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; @@ -695,9 +694,10 @@ synchronized CompletableFuture await() { synchronized int notifyFlush() { final int await = awaitCount; final int flush = ++flushCount; - awaitCount -= Optional.ofNullable(flushFutures.remove(flush)) - .map(Entry::complete) - .orElse(0); + final Entry removed = flushFutures.remove(flush); + if (removed != null) { + awaitCount -= removed.complete(); + } LOG.debug("notifyFlush {}, awaitCount: {} -> {}", flush, await, awaitCount); return await; }