Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
Expand Down Expand Up @@ -72,6 +73,7 @@
public abstract class StorageVolume implements Checkable<Boolean, VolumeCheckResult> {

private static final Logger LOG = LoggerFactory.getLogger(StorageVolume.class);
private static final Duration TIMEOUT_FAILURE_WINDOW = Duration.ofMinutes(70);

// The name of the directory used for temporary files on the volume.
public static final String TMP_DIR_NAME = "tmp";
Expand Down Expand Up @@ -111,6 +113,7 @@ public abstract class StorageVolume implements Checkable<Boolean, VolumeCheckRes
*/
private final boolean isDiskCheckEnabled;
private SlidingWindow ioTestSlidingWindow;
private SlidingWindow timeoutFailureSlidingWindow;
private int healthCheckFileSize;

/**
Expand Down Expand Up @@ -162,6 +165,9 @@ protected StorageVolume(Builder<?> b) throws IOException {
this.isDiskCheckEnabled = dnConf.isDiskCheckEnabled();
this.ioTestSlidingWindow = new SlidingWindow(dnConf.getVolumeIOFailureTolerance(),
dnConf.getDiskCheckSlidingWindowTimeout(), b.getClock());
this.timeoutFailureSlidingWindow = new SlidingWindow(
dnConf.getVolumeIOFailureTolerance(), TIMEOUT_FAILURE_WINDOW,
b.getClock());
this.healthCheckFileSize = dnConf.getVolumeHealthCheckFileSize();
} else {
storageDir = new File(b.volumeRootStr);
Expand Down Expand Up @@ -567,6 +573,11 @@ public SlidingWindow getIoTestSlidingWindow() {
return ioTestSlidingWindow;
}

@VisibleForTesting
public SlidingWindow getTimeoutFailureSlidingWindow() {
return timeoutFailureSlidingWindow;
}

public StorageType getStorageType() {
return storageType;
}
Expand Down Expand Up @@ -752,6 +763,35 @@ public synchronized VolumeCheckResult check(@Nullable Boolean unused)
return VolumeCheckResult.HEALTHY;
}

/**
* Records a volume-check timeout in the timeout failure window.
*
* <p>This is intentionally separate from the normal IO failure window.
* Timeouts are tracked as "more than N timeout events within the timeout
* window" rather than as a consecutive counter. The time-based expiry
* automatically removes old timeout events.
*
* @return {@code true} if the number of timeout events in the timeout window
* now exceeds the tolerated threshold and the volume should be
* marked failed; {@code false} otherwise.
*/
public boolean recordTimeoutAndCheckFailure() {
timeoutFailureSlidingWindow.add();
if (timeoutFailureSlidingWindow.isExceeded()) {
LOG.error("Volume {} check timed out more than the {} tolerated times "
+ "within the past {} ms. Marking FAILED.",
this, timeoutFailureSlidingWindow.getWindowSize(),
timeoutFailureSlidingWindow.getExpiryDurationMillis());
return true;
}
LOG.warn("Volume {} check timed out. Encountered {} out of {} tolerated "
+ "timeouts within the past {} ms.",
this, timeoutFailureSlidingWindow.getNumEventsInWindow(),
timeoutFailureSlidingWindow.getWindowSize(),
timeoutFailureSlidingWindow.getExpiryDurationMillis());
return false;
}

@Override
public int hashCode() {
return Objects.hash(storageDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand Down Expand Up @@ -224,6 +225,13 @@ public Set<? extends StorageVolume> checkAllVolumes(
final AtomicLong numVolumes = new AtomicLong(volumes.size());
final CountDownLatch latch = new CountDownLatch(1);

// Shared set used to guarantee exactly-one call to
// recordTimeoutAndCheckFailure() per volume, regardless of whether the
// per-check timeout (ResultHandler.onFailure) or the global latch timeout
// (pending-volumes loop below) fires first. The first path to CAS-add the
// volume owns the tolerance decision; the other path skips it.
final Set<StorageVolume> timeoutHandledSet = ConcurrentHashMap.newKeySet();

for (StorageVolume v : volumes) {
Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(v, null);
Expand All @@ -232,7 +240,8 @@ public Set<? extends StorageVolume> checkAllVolumes(
allVolumes.add(v);
Futures.addCallback(olf.get(),
new ResultHandler(v, healthyVolumes, failedVolumes,
numVolumes, (ignored1, ignored2) -> latch.countDown()),
numVolumes, (ignored1, ignored2) -> latch.countDown(),
timeoutHandledSet),
MoreExecutors.directExecutor());
} else {
if (v instanceof HddsVolume) {
Expand All @@ -246,18 +255,44 @@ public Set<? extends StorageVolume> checkAllVolumes(

// Wait until our timeout elapses, after which we give up on
// the remaining volumes.
if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
LOG.warn("checkAllVolumes timed out after {} ms",
maxAllowedTimeForCheckMs);
}
boolean completedOnTime =
latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS);

synchronized (this) {
// All volumes that have not been detected as healthy should be
// considered failed. This is a superset of 'failedVolumes'.
//
// Make a copy under the mutex as Sets.difference() returns a view
// of a potentially changing set.
return new HashSet<>(Sets.difference(allVolumes, healthyVolumes));
if (!completedOnTime) {
LOG.warn("checkAllVolumes timed out after {} ms."
+ " Evaluating per-volume latch-timeout tolerance.",
maxAllowedTimeForCheckMs);
}

// Volumes that explicitly reported FAILED via check() are always
// returned — the IO-failure sliding window in StorageVolume.check()
// already applied its own tolerance.
final Set<StorageVolume> result = new HashSet<>(failedVolumes);

// Volumes still pending (neither healthy nor explicitly failed) at
// latch-timeout time. onFailure() may have already handled some of
// these via timeoutHandledSet; skip those to avoid double-counting.
final Set<StorageVolume> pendingVolumes =
new HashSet<>(Sets.difference(allVolumes,
Sets.union(healthyVolumes, failedVolumes)));

for (StorageVolume v : pendingVolumes) {
if (!timeoutHandledSet.add(v)) {
// onFailure() already handled this volume's timeout (per-check
// timeout fired before the latch). The tolerance decision was
// already made there; nothing left to do.
continue;
}
// Latch fired first — this is the first (and only) handler.
if (v.recordTimeoutAndCheckFailure()) {
// Tolerance exceeded — mark as failed.
result.add(v);
}
// else: within tolerance this round — omit from failed set.
}

return result;
}
}

Expand Down Expand Up @@ -298,7 +333,7 @@ public boolean checkVolume(final StorageVolume volume, Callback callback) {
Futures.addCallback(olf.get(),
new ResultHandler(volume,
ConcurrentHashMap.newKeySet(), ConcurrentHashMap.newKeySet(),
new AtomicLong(1), callback),
new AtomicLong(1), callback, null),
checkVolumeResultHandlerExecutorService
);
return true;
Expand All @@ -320,23 +355,39 @@ private static class ResultHandler
private final Callback callback;

/**
* @param healthyVolumes set of healthy volumes. If the disk check is
* successful, add the volume here.
* @param failedVolumes set of failed volumes. If the disk check fails,
* add the volume here.
* @param volumeCounter volumeCounter used to trigger callback invocation.
* @param callback invoked when the volumeCounter reaches 0.
* Shared set used to guarantee exactly-one call to
* {@link StorageVolume#recordTimeoutAndCheckFailure()} per volume when both
* the per-check timeout ({@link #onFailure}) and the global latch timeout
* (pending-volumes loop in {@code checkAllVolumes}) can race for the same
* volume.
* <p>
* {@code null} for the {@code checkVolume()} path, where no latch exists
* and {@link #onFailure} is the sole timeout handler.
*/
@Nullable
private final Set<StorageVolume> timeoutHandledSet;

/**
* @param healthyVolumes set of healthy volumes.
* @param failedVolumes set of failed volumes.
* @param volumeCounter triggers callback when it reaches 0.
* @param callback invoked when volumeCounter reaches 0.
* @param timeoutHandledSet shared CAS set for exactly-once timeout
* handling; {@code null} for
* {@code checkVolume()}.
*/
ResultHandler(StorageVolume volume,
Set<StorageVolume> healthyVolumes,
Set<StorageVolume> failedVolumes,
AtomicLong volumeCounter,
@Nullable Callback callback) {
@Nullable Callback callback,
@Nullable Set<StorageVolume> timeoutHandledSet) {
this.volume = volume;
this.healthyVolumes = healthyVolumes;
this.failedVolumes = failedVolumes;
this.volumeCounter = volumeCounter;
this.callback = callback;
this.timeoutHandledSet = timeoutHandledSet;
}

@Override
Expand Down Expand Up @@ -376,10 +427,34 @@ public void onFailure(@Nonnull Throwable t) {
volume, exception);
// If the scan was interrupted, do not count it as a volume failure.
// This should only happen if the volume checker is being shut down.
if (!(t instanceof InterruptedException)) {
markFailed();
cleanup();
if (t instanceof InterruptedException) {
return;
}
// Detect a per-check timeout from ThrottledAsyncChecker.
// Guava 28+ (including 33.5.0-jre used here) fails the TimeoutFuture
// with TimeoutException on timeout.
boolean isTimeout = exception instanceof TimeoutException;
if (isTimeout) {
// timeoutHandledSet is null for checkVolume() (sole timeout handler).
// For checkAllVolumes(), the set is shared with the pending-volumes
// loop; CAS-add determines which path owns the tolerance decision.
boolean firstToHandle =
(timeoutHandledSet == null) || timeoutHandledSet.add(volume);
if (firstToHandle) {
if (!volume.recordTimeoutAndCheckFailure()) {
// Within tolerance: do NOT trigger the failure callback.
// The volume is not marked failed; the next check cycle will
// re-evaluate its health. cleanup() is intentionally not called
// to avoid firing handleVolumeFailures() with an empty failed set.
return;
}
// Tolerance exceeded — fall through to markFailed()/cleanup().
}
// else: the pending-volumes loop already handled this timeout.
// Fall through to markFailed()/cleanup() for counter bookkeeping only.
}
markFailed();
cleanup();
}

private void markHealthy() {
Expand Down
Loading