diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java index 8f6553c4ec7..00aa0976ce8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolume.java @@ -28,6 +28,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.time.Duration; import java.util.Objects; import java.util.Properties; import java.util.UUID; @@ -72,6 +73,7 @@ public abstract class StorageVolume implements Checkable { private static final Logger LOG = LoggerFactory.getLogger(StorageVolume.class); + private static final Duration TIMEOUT_FAILURE_WINDOW = Duration.ofMinutes(70); // The name of the directory used for temporary files on the volume. public static final String TMP_DIR_NAME = "tmp"; @@ -111,6 +113,7 @@ public abstract class StorageVolume implements Checkable b) throws IOException { this.isDiskCheckEnabled = dnConf.isDiskCheckEnabled(); this.ioTestSlidingWindow = new SlidingWindow(dnConf.getVolumeIOFailureTolerance(), dnConf.getDiskCheckSlidingWindowTimeout(), b.getClock()); + this.timeoutFailureSlidingWindow = new SlidingWindow( + dnConf.getVolumeIOFailureTolerance(), TIMEOUT_FAILURE_WINDOW, + b.getClock()); this.healthCheckFileSize = dnConf.getVolumeHealthCheckFileSize(); } else { storageDir = new File(b.volumeRootStr); @@ -567,6 +573,11 @@ public SlidingWindow getIoTestSlidingWindow() { return ioTestSlidingWindow; } + @VisibleForTesting + public SlidingWindow getTimeoutFailureSlidingWindow() { + return timeoutFailureSlidingWindow; + } + public StorageType getStorageType() { return storageType; } @@ -752,6 +763,35 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused) return VolumeCheckResult.HEALTHY; } + /** + * Records a volume-check timeout in the timeout failure window. + * + *

This is intentionally separate from the normal IO failure window. + * Timeouts are tracked as "more than N timeout events within the timeout + * window" rather than as a consecutive counter. The time-based expiry + * automatically removes old timeout events. + * + * @return {@code true} if the number of timeout events in the timeout window + * now exceeds the tolerated threshold and the volume should be + * marked failed; {@code false} otherwise. + */ + public boolean recordTimeoutAndCheckFailure() { + timeoutFailureSlidingWindow.add(); + if (timeoutFailureSlidingWindow.isExceeded()) { + LOG.error("Volume {} check timed out more than the {} tolerated times " + + "within the past {} ms. Marking FAILED.", + this, timeoutFailureSlidingWindow.getWindowSize(), + timeoutFailureSlidingWindow.getExpiryDurationMillis()); + return true; + } + LOG.warn("Volume {} check timed out. Encountered {} out of {} tolerated " + + "timeouts within the past {} ms.", + this, timeoutFailureSlidingWindow.getNumEventsInWindow(), + timeoutFailureSlidingWindow.getWindowSize(), + timeoutFailureSlidingWindow.getExpiryDurationMillis()); + return false; + } + @Override public int hashCode() { return Objects.hash(storageDir); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java index b48b0dac118..dd760048956 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/StorageVolumeChecker.java @@ -42,6 +42,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -224,6 +225,13 @@ public Set checkAllVolumes( final AtomicLong numVolumes = new AtomicLong(volumes.size()); final CountDownLatch latch = new CountDownLatch(1); + // Shared set used to guarantee exactly-one call to + // recordTimeoutAndCheckFailure() per volume, regardless of whether the + // per-check timeout (ResultHandler.onFailure) or the global latch timeout + // (pending-volumes loop below) fires first. The first path to CAS-add the + // volume owns the tolerance decision; the other path skips it. + final Set timeoutHandledSet = ConcurrentHashMap.newKeySet(); + for (StorageVolume v : volumes) { Optional> olf = delegateChecker.schedule(v, null); @@ -232,7 +240,8 @@ public Set checkAllVolumes( allVolumes.add(v); Futures.addCallback(olf.get(), new ResultHandler(v, healthyVolumes, failedVolumes, - numVolumes, (ignored1, ignored2) -> latch.countDown()), + numVolumes, (ignored1, ignored2) -> latch.countDown(), + timeoutHandledSet), MoreExecutors.directExecutor()); } else { if (v instanceof HddsVolume) { @@ -246,18 +255,44 @@ public Set checkAllVolumes( // Wait until our timeout elapses, after which we give up on // the remaining volumes. - if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { - LOG.warn("checkAllVolumes timed out after {} ms", - maxAllowedTimeForCheckMs); - } + boolean completedOnTime = + latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS); synchronized (this) { - // All volumes that have not been detected as healthy should be - // considered failed. This is a superset of 'failedVolumes'. - // - // Make a copy under the mutex as Sets.difference() returns a view - // of a potentially changing set. - return new HashSet<>(Sets.difference(allVolumes, healthyVolumes)); + if (!completedOnTime) { + LOG.warn("checkAllVolumes timed out after {} ms." + + " Evaluating per-volume latch-timeout tolerance.", + maxAllowedTimeForCheckMs); + } + + // Volumes that explicitly reported FAILED via check() are always + // returned — the IO-failure sliding window in StorageVolume.check() + // already applied its own tolerance. + final Set result = new HashSet<>(failedVolumes); + + // Volumes still pending (neither healthy nor explicitly failed) at + // latch-timeout time. onFailure() may have already handled some of + // these via timeoutHandledSet; skip those to avoid double-counting. + final Set pendingVolumes = + new HashSet<>(Sets.difference(allVolumes, + Sets.union(healthyVolumes, failedVolumes))); + + for (StorageVolume v : pendingVolumes) { + if (!timeoutHandledSet.add(v)) { + // onFailure() already handled this volume's timeout (per-check + // timeout fired before the latch). The tolerance decision was + // already made there; nothing left to do. + continue; + } + // Latch fired first — this is the first (and only) handler. + if (v.recordTimeoutAndCheckFailure()) { + // Tolerance exceeded — mark as failed. + result.add(v); + } + // else: within tolerance this round — omit from failed set. + } + + return result; } } @@ -298,7 +333,7 @@ public boolean checkVolume(final StorageVolume volume, Callback callback) { Futures.addCallback(olf.get(), new ResultHandler(volume, ConcurrentHashMap.newKeySet(), ConcurrentHashMap.newKeySet(), - new AtomicLong(1), callback), + new AtomicLong(1), callback, null), checkVolumeResultHandlerExecutorService ); return true; @@ -320,23 +355,39 @@ private static class ResultHandler private final Callback callback; /** - * @param healthyVolumes set of healthy volumes. If the disk check is - * successful, add the volume here. - * @param failedVolumes set of failed volumes. If the disk check fails, - * add the volume here. - * @param volumeCounter volumeCounter used to trigger callback invocation. - * @param callback invoked when the volumeCounter reaches 0. + * Shared set used to guarantee exactly-one call to + * {@link StorageVolume#recordTimeoutAndCheckFailure()} per volume when both + * the per-check timeout ({@link #onFailure}) and the global latch timeout + * (pending-volumes loop in {@code checkAllVolumes}) can race for the same + * volume. + *

+ * {@code null} for the {@code checkVolume()} path, where no latch exists + * and {@link #onFailure} is the sole timeout handler. + */ + @Nullable + private final Set timeoutHandledSet; + + /** + * @param healthyVolumes set of healthy volumes. + * @param failedVolumes set of failed volumes. + * @param volumeCounter triggers callback when it reaches 0. + * @param callback invoked when volumeCounter reaches 0. + * @param timeoutHandledSet shared CAS set for exactly-once timeout + * handling; {@code null} for + * {@code checkVolume()}. */ ResultHandler(StorageVolume volume, Set healthyVolumes, Set failedVolumes, AtomicLong volumeCounter, - @Nullable Callback callback) { + @Nullable Callback callback, + @Nullable Set timeoutHandledSet) { this.volume = volume; this.healthyVolumes = healthyVolumes; this.failedVolumes = failedVolumes; this.volumeCounter = volumeCounter; this.callback = callback; + this.timeoutHandledSet = timeoutHandledSet; } @Override @@ -376,10 +427,34 @@ public void onFailure(@Nonnull Throwable t) { volume, exception); // If the scan was interrupted, do not count it as a volume failure. // This should only happen if the volume checker is being shut down. - if (!(t instanceof InterruptedException)) { - markFailed(); - cleanup(); + if (t instanceof InterruptedException) { + return; } + // Detect a per-check timeout from ThrottledAsyncChecker. + // Guava 28+ (including 33.5.0-jre used here) fails the TimeoutFuture + // with TimeoutException on timeout. + boolean isTimeout = exception instanceof TimeoutException; + if (isTimeout) { + // timeoutHandledSet is null for checkVolume() (sole timeout handler). + // For checkAllVolumes(), the set is shared with the pending-volumes + // loop; CAS-add determines which path owns the tolerance decision. + boolean firstToHandle = + (timeoutHandledSet == null) || timeoutHandledSet.add(volume); + if (firstToHandle) { + if (!volume.recordTimeoutAndCheckFailure()) { + // Within tolerance: do NOT trigger the failure callback. + // The volume is not marked failed; the next check cycle will + // re-evaluate its health. cleanup() is intentionally not called + // to avoid firing handleVolumeFailures() with an empty failed set. + return; + } + // Tolerance exceeded — fall through to markFailed()/cleanup(). + } + // else: the pending-volumes loop already handled this timeout. + // Fall through to markFailed()/cleanup() for counter bookkeeping only. + } + markFailed(); + cleanup(); } private void markHealthy() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java index 71cb7af04b7..8345593a633 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeChecker.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.FAILED; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.isNull; @@ -28,8 +29,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import java.io.File; import java.nio.file.Path; import java.time.Duration; @@ -39,8 +43,13 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -301,6 +310,165 @@ public void testNumScansSkipped() throws Exception { checker.shutdownAndWait(0, TimeUnit.SECONDS); } + /** + * Explicitly captures the {@link Throwable} type that + * {@link Futures#withTimeout} delivers to {@code onFailure()} when the + * deadline fires. + * + *

{@link ThrottledAsyncChecker} uses {@code Futures.withTimeout()} + * internally; this test replicates that exact call to confirm that Guava + * 28+ (including the 33.5.0-jre version used by Ozone) delivers a + * {@link TimeoutException} — NOT a {@link java.util.concurrent.CancellationException}. + * {@code CancellationException} in new Guava means external cancellation + * (e.g. executor shutdown), not a disk-check timeout, so + * {@link StorageVolumeChecker} {@code ResultHandler.onFailure()} only + * treats {@link TimeoutException} as a timeout. + */ + @Test + public void testFuturesWithTimeoutExceptionType() throws Exception { + ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(); + AtomicReference captured = new AtomicReference<>(); + CountDownLatch done = new CountDownLatch(1); + + try { + // A future that never completes on its own — same situation as a + // check() thread blocked inside fsync(). + SettableFuture neverCompletes = SettableFuture.create(); + + // Wrap with a real Futures.withTimeout(), identical to what + // ThrottledAsyncChecker does. + ListenableFuture timedFuture = + Futures.withTimeout(neverCompletes, 100, TimeUnit.MILLISECONDS, + scheduler); + + Futures.addCallback(timedFuture, new FutureCallback() { + @Override + public void onSuccess(VolumeCheckResult result) { + done.countDown(); + } + + @Override + public void onFailure(Throwable t) { + captured.set(t); + done.countDown(); + } + }, MoreExecutors.directExecutor()); + + assertTrue(done.await(2, TimeUnit.SECONDS), + "Timeout should have fired within 2 seconds"); + + Throwable thrown = captured.get(); + LOG.info("Futures.withTimeout() delivered to onFailure(): {}", + thrown.getClass().getName()); + + // Guava 28+ delivers TimeoutException for a timeout. + // CancellationException would mean external cancellation, not a timeout. + assertTrue(thrown instanceof TimeoutException, + "Expected TimeoutException from Futures.withTimeout() but got: " + + thrown.getClass().getName()); + } finally { + scheduler.shutdownNow(); + } + } + + /** + * Verifies that when the per-check timeout inside {@link ThrottledAsyncChecker} + * fires on {@link StorageVolumeChecker#checkVolume}, the first timeout is + * tolerated (callback not invoked, volume not failed) and the second timeout + * within the timeout window causes the volume to be failed. + * + *

This test uses the real {@link ThrottledAsyncChecker} (not + * {@link DummyChecker}) so that the actual {@code TimeoutException} + * path in {@code ResultHandler.onFailure()} fires. + */ + @Test + public void testCheckVolumeTimeoutTolerance() throws Exception { + setup(); + // Use a very short check timeout so the test completes quickly. + OzoneConfiguration timeoutConf = new OzoneConfiguration(); + DatanodeConfiguration dnConf = timeoutConf.getObject(DatanodeConfiguration.class); + dnConf.setDiskCheckTimeout(Duration.ofMillis(200)); + dnConf.setDiskCheckMinGap(Duration.ZERO); + timeoutConf.setFromObject(dnConf); + + // A latch-controlled mock volume: check() blocks until released. + HddsVolume volume = mock(HddsVolume.class); + CountDownLatch blockLatch = new CountDownLatch(1); + when(volume.check(any())).thenAnswer(inv -> { + blockLatch.await(); + return VolumeCheckResult.HEALTHY; + }); + // First timeout returns false (within tolerance), second returns true. + when(volume.recordTimeoutAndCheckFailure()).thenReturn(false).thenReturn(true); + + AtomicLong callbackCount = new AtomicLong(0); + StorageVolumeChecker checker = + new StorageVolumeChecker(timeoutConf, new FakeTimer(), "test-"); + + // First checkVolume — should time out and be tolerated (callback NOT fired). + checker.checkVolume(volume, (healthy, failed) -> callbackCount.incrementAndGet()); + + // Wait long enough for the timeout to fire. + Thread.sleep(600); + assertEquals(0, callbackCount.get(), + "Callback must NOT fire for a tolerated timeout"); + verify(volume, times(1)).recordTimeoutAndCheckFailure(); + + // Second checkVolume — should time out and exceed tolerance (callback fired). + checker.checkVolume(volume, (healthy, failed) -> callbackCount.incrementAndGet()); + Thread.sleep(600); + + assertEquals(1, callbackCount.get(), + "Callback must fire when tolerance is exceeded"); + + blockLatch.countDown(); + checker.shutdownAndWait(5, TimeUnit.SECONDS); + } + + /** + * Verifies that when the {@code checkAllVolumes()} latch times out and + * pending volumes have not yet reported a result, the timeout is recorded in + * the timeout window and the first timeout is tolerated. + * + *

This test confirms that {@code recordTimeoutAndCheckFailure()} is called on + * pending volumes, and that the volume is NOT in the returned failed set on + * the first timeout. + */ + @Test + public void testCheckAllVolumesLatchTimeoutTolerance() throws Exception { + setup(); + OzoneConfiguration timeoutConf = new OzoneConfiguration(); + DatanodeConfiguration dnConf = timeoutConf.getObject(DatanodeConfiguration.class); + dnConf.setDiskCheckTimeout(Duration.ofMillis(200)); + dnConf.setDiskCheckMinGap(Duration.ZERO); + timeoutConf.setFromObject(dnConf); + + HddsVolume volume = mock(HddsVolume.class); + CountDownLatch blockLatch = new CountDownLatch(1); + when(volume.check(any())).thenAnswer(inv -> { + blockLatch.await(); + return VolumeCheckResult.HEALTHY; + }); + // Simulate: first timeout is within tolerance. + when(volume.recordTimeoutAndCheckFailure()).thenReturn(false); + when(volume.getVolumeInfoStats()).thenReturn( + new VolumeInfoMetrics("test-vol", volume)); + + StorageVolumeChecker checker = + new StorageVolumeChecker(timeoutConf, new FakeTimer(), "test-"); + + Set failed = + checker.checkAllVolumes(Arrays.asList(volume)); + + assertFalse(failed.contains(volume), + "Volume must NOT be in failed set on first tolerated timeout"); + verify(volume, times(1)).recordTimeoutAndCheckFailure(); + + blockLatch.countDown(); + checker.shutdownAndWait(5, TimeUnit.SECONDS); + } + /** * A checker to wraps the result of {@link HddsVolume#check} in * an ImmediateFuture. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java index d291916adf0..35a4bd7d611 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/volume/TestStorageVolumeHealthChecks.java @@ -18,11 +18,14 @@ package org.apache.hadoop.ozone.container.common.volume; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.time.Instant; import java.util.UUID; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; @@ -89,6 +92,7 @@ public void setup() throws Exception { // needs to be cleared before each test. FileUtils.deleteDirectory(volumePath.toFile()); DiskCheckUtil.clearTestImpl(); + TEST_CLOCK.set(Instant.now()); } @ParameterizedTest @@ -324,6 +328,69 @@ public void testCorrectDirectoryChecked(StorageVolume.Builder builder) volume.check(false); } + /** + * With the default settings (ioFailureTolerance=1), the first simulated + * timeout within the timeout window must be tolerated. + */ + @ParameterizedTest + @MethodSource("volumeBuilders") + public void testFirstTimeoutWithinWindowIsTolerated( + StorageVolume.Builder builder) + throws Exception { + StorageVolume volume = builder.build(); + volume.format(CLUSTER_ID); + volume.createTmpDirs(CLUSTER_ID); + + assertFalse(volume.recordTimeoutAndCheckFailure(), + "First timeout within the window should be tolerated"); + assertEquals(1, volume.getTimeoutFailureSlidingWindow().getNumEventsInWindow()); + } + + /** + * With the default settings (ioFailureTolerance=1), the second timeout + * within the timeout window must fail the volume. + */ + @ParameterizedTest + @MethodSource("volumeBuilders") + public void testSecondTimeoutWithinWindowFails( + StorageVolume.Builder builder) + throws Exception { + StorageVolume volume = builder.build(); + volume.format(CLUSTER_ID); + volume.createTmpDirs(CLUSTER_ID); + + assertFalse(volume.recordTimeoutAndCheckFailure(), + "First timeout should be tolerated"); + assertEquals(1, volume.getTimeoutFailureSlidingWindow().getNumEventsInWindow()); + + assertTrue(volume.recordTimeoutAndCheckFailure(), + "Second timeout in the window should exceed tolerance and return true"); + assertEquals(1, volume.getTimeoutFailureSlidingWindow().getWindowSize()); + } + + /** + * Timeout events automatically expire from the timeout window, so an old + * timeout does not need an explicit reset before a later timeout is + * tolerated again. + */ + @ParameterizedTest + @MethodSource("volumeBuilders") + public void testExpiredTimeoutDoesNotCountTowardLaterFailure( + StorageVolume.Builder builder) throws Exception { + StorageVolume volume = builder.build(); + volume.format(CLUSTER_ID); + volume.createTmpDirs(CLUSTER_ID); + + assertFalse(volume.recordTimeoutAndCheckFailure(), + "First timeout should be tolerated"); + TEST_CLOCK.fastForward( + volume.getTimeoutFailureSlidingWindow().getExpiryDurationMillis() + 1); + + assertFalse(volume.recordTimeoutAndCheckFailure(), + "Timeout after the window expires should be tolerated again"); + assertEquals(1, volume.getTimeoutFailureSlidingWindow().getNumEventsInWindow()); + } + /** * Asserts that the disk checks are being done on the correct directory for * each volume type.