From e45975a5b0bedc0e8b410b231a687c069a06956c Mon Sep 17 00:00:00 2001 From: Andrew Audibert Date: Mon, 19 Feb 2018 12:05:19 -0800 Subject: [PATCH] [SMALLFIX] Retry master address lookup (#6886) * Get master address inside retry loop * Add retries around block worker registration * Fix checkstyle * Address review comments * Address review comments --- .../src/main/java/alluxio/AbstractClient.java | 32 +++-- .../src/main/java/alluxio/Configuration.java | 11 ++ .../src/main/java/alluxio/PropertyKey.java | 19 +++ .../java/alluxio/heartbeat/SleepingTimer.java | 7 +- .../retry/ExponentialTimeBoundedRetry.java | 117 ++++++++++++++++++ .../main/java/alluxio/retry/RetryPolicy.java | 8 +- .../main/java/alluxio/retry/RetryUtils.java | 60 +++++++++ .../java/alluxio/retry/TimeBoundedRetry.java | 84 +++++++++++++ .../src/main/java/alluxio/time/Sleeper.java | 8 +- .../main/java/alluxio/time/ThreadSleeper.java | 6 +- .../main/java/alluxio/time/TimeContext.java | 47 +++++++ .../test/java/alluxio/clock/ManualClock.java | 17 ++- .../alluxio/heartbeat/SleepingTimerTest.java | 8 +- .../ExponentialTimeBoundedRetryTest.java | 86 +++++++++++++ .../java/alluxio/retry/RetryUtilsTest.java | 53 ++++++++ .../test/java/alluxio/time/ManualSleeper.java | 96 ++++++++++++++ .../java/alluxio/time/ManualSleeperTest.java | 71 +++++++++++ .../worker/block/DefaultBlockWorker.java | 11 +- pom.xml | 11 ++ 19 files changed, 724 insertions(+), 28 deletions(-) create mode 100644 core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java create mode 100644 core/common/src/main/java/alluxio/retry/RetryUtils.java create mode 100644 core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java create mode 100644 core/common/src/main/java/alluxio/time/TimeContext.java create mode 100644 core/common/src/test/java/alluxio/retry/ExponentialTimeBoundedRetryTest.java create mode 100644 core/common/src/test/java/alluxio/retry/RetryUtilsTest.java create mode 100644 core/common/src/test/java/alluxio/time/ManualSleeper.java create mode 100644 core/common/src/test/java/alluxio/time/ManualSleeperTest.java diff --git a/core/common/src/main/java/alluxio/AbstractClient.java b/core/common/src/main/java/alluxio/AbstractClient.java index e02c27cd88b7..992e7a16bf3f 100644 --- a/core/common/src/main/java/alluxio/AbstractClient.java +++ b/core/common/src/main/java/alluxio/AbstractClient.java @@ -18,8 +18,8 @@ import alluxio.exception.status.Status; import alluxio.exception.status.UnavailableException; import alluxio.exception.status.UnimplementedException; -import alluxio.retry.ExponentialBackoffRetry; import alluxio.retry.RetryPolicy; +import alluxio.retry.ExponentialTimeBoundedRetry; import alluxio.security.authentication.TransportProvider; import alluxio.thrift.AlluxioService; import alluxio.thrift.AlluxioTException; @@ -36,6 +36,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.regex.Pattern; import javax.annotation.concurrent.ThreadSafe; @@ -53,10 +54,12 @@ public abstract class AbstractClient implements Client { private static final Pattern FRAME_SIZE_EXCEPTION_PATTERN = Pattern.compile("Frame size \\((\\d+)\\) larger than max length"); - private static final int BASE_SLEEP_MS = - (int) Configuration.getMs(PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS); - private static final int MAX_SLEEP_MS = - (int) Configuration.getMs(PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS); + private static final Duration MAX_RETRY_DURATION = + Configuration.getDuration(PropertyKey.USER_RPC_RETRY_MAX_DURATION); + private static final Duration BASE_SLEEP_MS = + Configuration.getDuration(PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS); + private static final Duration MAX_SLEEP_MS = + Configuration.getDuration(PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS); /** The number of times to retry a particular RPC. */ protected static final int RPC_MAX_NUM_RETRY = @@ -167,16 +170,24 @@ public synchronized void connect() throws AlluxioStatusException { Preconditions.checkState(!mClosed, "Client is closed, will not try to connect."); RetryPolicy retryPolicy = - new ExponentialBackoffRetry(BASE_SLEEP_MS, MAX_SLEEP_MS, RPC_MAX_NUM_RETRY); + ExponentialTimeBoundedRetry.builder().withMaxDuration(MAX_RETRY_DURATION) + .withInitialSleep(BASE_SLEEP_MS).withMaxSleep(MAX_SLEEP_MS).build(); while (true) { if (mClosed) { throw new FailedPreconditionException("Failed to connect: client has been closed"); } - // Re-query the address in each loop iteration in case it has changed (e.g. master failover). - mAddress = getAddress(); + // Re-query the address in each loop iteration in case it has changed (e.g. master + // failover). + try { + mAddress = getAddress(); + } catch (UnavailableException e) { + if (!retryPolicy.attemptRetry()) { + break; + } + continue; + } LOG.info("Alluxio client (version {}) is trying to connect with {} @ {}", RuntimeConstants.VERSION, getServiceName(), mAddress); - TProtocol binaryProtocol = new TBinaryProtocol(mTransportProvider.getClientTransport(mParentSubject, mAddress)); mProtocol = new TMultiplexedProtocol(binaryProtocol, getServiceName()); @@ -282,7 +293,8 @@ protected interface RpcCallable { */ protected synchronized V retryRPC(RpcCallable rpc) throws AlluxioStatusException { RetryPolicy retryPolicy = - new ExponentialBackoffRetry(BASE_SLEEP_MS, MAX_SLEEP_MS, RPC_MAX_NUM_RETRY); + ExponentialTimeBoundedRetry.builder().withMaxDuration(MAX_RETRY_DURATION) + .withInitialSleep(BASE_SLEEP_MS).withMaxSleep(MAX_SLEEP_MS).build(); while (!mClosed) { Exception ex; connect(); diff --git a/core/common/src/main/java/alluxio/Configuration.java b/core/common/src/main/java/alluxio/Configuration.java index 288b31758467..3929a8230fc1 100644 --- a/core/common/src/main/java/alluxio/Configuration.java +++ b/core/common/src/main/java/alluxio/Configuration.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.lang.management.ManagementFactory; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -403,6 +404,16 @@ public static long getMs(PropertyKey key) { } } + /** + * Gets the time of the key as a duration. + * + * @param key the key to get the value for + * @return the value of the key represented as a duration + */ + public static Duration getDuration(PropertyKey key) { + return Duration.ofMillis(getMs(key)); + } + /** * Gets the value for the given key as a class. * diff --git a/core/common/src/main/java/alluxio/PropertyKey.java b/core/common/src/main/java/alluxio/PropertyKey.java index b7564f352fba..49bac733ddb2 100644 --- a/core/common/src/main/java/alluxio/PropertyKey.java +++ b/core/common/src/main/java/alluxio/PropertyKey.java @@ -1060,6 +1060,14 @@ public String toString() { .setDescription("Netty socket option for SO_SNDBUF: the proposed buffer size that will " + "be used for sends.") .build(); + public static final PropertyKey WORKER_MASTER_CONNECT_RETRY_TIMEOUT = + new Builder(Name.WORKER_MASTER_CONNECT_RETRY_TIMEOUT) + .setDescription("Retry period before workers give up on connecting to master") + .setDefaultValue("1hour") + // Leaving this hidden for now until we sort out how it should interact with + // WORKER_BLOCK_HEARTBEAT_TIMEOUT_MS. + .setIsHidden(true) + .build(); public static final PropertyKey WORKER_NETWORK_NETTY_CHANNEL = new Builder(Name.WORKER_NETWORK_NETTY_CHANNEL) .setDescription("Netty channel type: NIO or EPOLL.") @@ -1758,6 +1766,13 @@ public String toString() { + "an exponential backoff. This property determines the base time " + "in the exponential backoff.") .build(); + public static final PropertyKey USER_RPC_RETRY_MAX_DURATION = + new Builder(Name.USER_RPC_RETRY_MAX_DURATION) + .setDefaultValue("2min") + .setDescription("Alluxio client RPCs automatically retry for transient errors with " + + "an exponential backoff. This property determines the maximum duration to retry for" + + " before giving up.") + .build(); public static final PropertyKey USER_RPC_RETRY_MAX_NUM_RETRY = new Builder(Name.USER_RPC_RETRY_MAX_NUM_RETRY) .setDefaultValue(100) @@ -2315,6 +2330,8 @@ public static final class Name { "alluxio.worker.network.netty.buffer.receive"; public static final String WORKER_NETWORK_NETTY_BUFFER_SEND = "alluxio.worker.network.netty.buffer.send"; + public static final String WORKER_MASTER_CONNECT_RETRY_TIMEOUT = + "alluxio.worker.master.connect.retry.timeout"; public static final String WORKER_NETWORK_NETTY_CHANNEL = "alluxio.worker.network.netty.channel"; public static final String WORKER_NETWORK_NETTY_FILE_TRANSFER_TYPE = @@ -2484,6 +2501,8 @@ public static final class Name { "alluxio.user.network.netty.reader.packet.size.bytes"; public static final String USER_RPC_RETRY_BASE_SLEEP_MS = "alluxio.user.rpc.retry.base.sleep"; + public static final String USER_RPC_RETRY_MAX_DURATION = + "alluxio.user.rpc.retry.max.duration"; public static final String USER_RPC_RETRY_MAX_NUM_RETRY = "alluxio.user.rpc.retry.max.num.retry"; public static final String USER_RPC_RETRY_MAX_SLEEP_MS = "alluxio.user.rpc.retry.max.sleep"; diff --git a/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java b/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java index 5eb846ead4f3..33331ea757c6 100644 --- a/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java +++ b/core/common/src/main/java/alluxio/heartbeat/SleepingTimer.java @@ -18,9 +18,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.NotThreadSafe; - import java.time.Clock; +import java.time.Duration; + +import javax.annotation.concurrent.NotThreadSafe; /** * This class can be used for executing heartbeats periodically. @@ -75,7 +76,7 @@ public void tick() throws InterruptedException { mLogger.warn("{} last execution took {} ms. Longer than the interval {}", mThreadName, executionTimeMs, mIntervalMs); } else { - mSleeper.sleep(mIntervalMs - executionTimeMs); + mSleeper.sleep(Duration.ofMillis(mIntervalMs - executionTimeMs)); } } mPreviousTickMs = mClock.millis(); diff --git a/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java b/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java new file mode 100644 index 000000000000..77d0ee53f24f --- /dev/null +++ b/core/common/src/main/java/alluxio/retry/ExponentialTimeBoundedRetry.java @@ -0,0 +1,117 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.retry; + +import alluxio.time.TimeContext; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +/** + * A retry policy which uses exponential backoff and a maximum duration time bound. + * + * A final retry will be performed at the time bound before giving up. + * + * For example, with initial sleep 10ms, maximum sleep 100ms, and maximum duration 500ms, the sleep + * timings would be [10, 20, 40, 80, 100, 100, 100, 50], assuming the operation being retries takes + * no time. The 50 at the end is because the previous times add up to 450, so the mechanism sleeps + * for only 50ms before the final attempt. + * + * However, those are just the base sleep timings. For each sleep time, we multiply by a random + * number from 1 to 1.1 to add jitter to avoid hotspotting. + */ +public final class ExponentialTimeBoundedRetry extends TimeBoundedRetry { + private final Duration mMaxSleep; + private Duration mNextSleep; + + /** + * See {@link Builder}. + */ + private ExponentialTimeBoundedRetry(TimeContext timeCtx, Duration maxDuration, + Duration initialSleep, Duration maxSleep) { + super(timeCtx, maxDuration); + mMaxSleep = maxSleep; + mNextSleep = initialSleep; + } + + @Override + protected Duration computeNextWaitTime() { + Duration next = mNextSleep; + mNextSleep = mNextSleep.multipliedBy(2); + if (mNextSleep.compareTo(mMaxSleep) > 0) { + mNextSleep = mMaxSleep; + } + // Add jitter. + long jitter = Math.round(ThreadLocalRandom.current().nextDouble(0.1) * next.toMillis()); + return next.plusMillis(jitter); + } + + /** + * @return a builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for time bounded exponential retry mechanisms. + */ + public static class Builder { + private TimeContext mTimeCtx = TimeContext.SYSTEM; + private Duration mMaxDuration; + private Duration mInitialSleep; + private Duration mMaxSleep; + + /** + * @param timeCtx time context + * @return the builder + */ + public Builder withTimeCtx(TimeContext timeCtx) { + mTimeCtx = timeCtx; + return this; + } + + /** + * @param maxDuration max total duration to retry for + * @return the builder + */ + public Builder withMaxDuration(Duration maxDuration) { + mMaxDuration = maxDuration; + return this; + } + + /** + * @param initialSleep initial sleep interval between retries + * @return the builder + */ + public Builder withInitialSleep(Duration initialSleep) { + mInitialSleep = initialSleep; + return this; + } + + /** + * @param maxSleep maximum sleep interval between retries + * @return the builder + */ + public Builder withMaxSleep(Duration maxSleep) { + mMaxSleep = maxSleep; + return this; + } + + /** + * @return the built retry mechanism + */ + public ExponentialTimeBoundedRetry build() { + return new ExponentialTimeBoundedRetry(mTimeCtx, mMaxDuration, mInitialSleep, mMaxSleep); + } + } +} diff --git a/core/common/src/main/java/alluxio/retry/RetryPolicy.java b/core/common/src/main/java/alluxio/retry/RetryPolicy.java index 24969a6e9ad1..585ee4f6a775 100644 --- a/core/common/src/main/java/alluxio/retry/RetryPolicy.java +++ b/core/common/src/main/java/alluxio/retry/RetryPolicy.java @@ -14,7 +14,8 @@ import javax.annotation.concurrent.NotThreadSafe; /** - * Attempts to retry code from a do/while loop. The way that this interface works is that the logic + * Policy for determining whether retries should be performed, and potentially waiting for some time + * before the next retry attempt. The way that this interface works is that the logic * for delayed retries (retries that sleep) can delay the caller of {@link #attemptRetry()}. Because * of this, its best to put retries in do/while loops to avoid the first wait. */ @@ -29,9 +30,10 @@ public interface RetryPolicy { int getRetryCount(); /** - * Attempts to run the given operation, returning false if unable to (max retries have happened). + * Waits until it is time to perform the next retry, then returns. Returns false if no further + * retries should be performed. * - * @return whether the operation have succeeded or failed (max retries have happened) + * @return whether another retry should be performed */ boolean attemptRetry(); } diff --git a/core/common/src/main/java/alluxio/retry/RetryUtils.java b/core/common/src/main/java/alluxio/retry/RetryUtils.java new file mode 100644 index 000000000000..7cfdd07139ca --- /dev/null +++ b/core/common/src/main/java/alluxio/retry/RetryUtils.java @@ -0,0 +1,60 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.retry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Utilities for performing retries. + */ +public final class RetryUtils { + private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class); + + /** + * Retries the given method until it doesn't throw an IO exception or the retry policy expires. If + * the retry policy expires, the last exception generated will be rethrown. + * + * @param action a description of the action that fits the phrase "Failed to ${action}" + * @param f the function to retry + * @param policy the retry policy to use + */ + public static void retry(String action, RunnableThrowsIOException f, RetryPolicy policy) + throws IOException { + IOException e; + do { + try { + f.run(); + return; + } catch (IOException ioe) { + e = ioe; + LOG.warn("Failed to {} (attempt {}): {}", action, policy.getRetryCount() + 1, e.toString()); + } + } while (policy.attemptRetry()); + throw e; + } + + /** + * Interface for methods which return nothing and may throw IOException. + */ + @FunctionalInterface + public interface RunnableThrowsIOException { + /** + * Runs the runnable. + */ + void run() throws IOException; + } + + private RetryUtils() {} // prevent instantiation +} diff --git a/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java b/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java new file mode 100644 index 000000000000..ab5e462f92d1 --- /dev/null +++ b/core/common/src/main/java/alluxio/retry/TimeBoundedRetry.java @@ -0,0 +1,84 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.retry; + +import alluxio.time.Sleeper; +import alluxio.time.TimeContext; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; + +/** + * Retry mechanism which performs retries until a certain period of time has elapsed. Subclasses + * determine the interval between retries. + */ +public abstract class TimeBoundedRetry implements RetryPolicy { + private final Clock mClock; + private final Sleeper mSleeper; + private final Duration mMaxDuration; + private final Instant mStartTime; + private final Instant mEndTime; + + private int mRetryCount = 0; + private boolean mDone = false; + + /** + * @param timeCtx the time context to use for time-based operations + * @param maxDuration the maximum duration + */ + public TimeBoundedRetry(TimeContext timeCtx, Duration maxDuration) { + mClock = timeCtx.getClock(); + mSleeper = timeCtx.getSleeper(); + mMaxDuration = maxDuration; + mRetryCount = 0; + mStartTime = mClock.instant(); + mEndTime = mStartTime.plus(mMaxDuration); + } + + @Override + public int getRetryCount() { + return mRetryCount; + } + + @Override + public boolean attemptRetry() { + if (mDone) { + return false; + } + Instant now = mClock.instant(); + // We should not do a retry if now == mEndTime. The final retry is timed to land at mEndTime, + // so if now == mEndTime, the operation may have taken less than 1ms. + if (!now.isBefore(mEndTime)) { + mDone = true; + return false; + } + try { + Duration nextWaitTime = computeNextWaitTime(); + if (now.plus(nextWaitTime).isAfter(mEndTime)) { + nextWaitTime = Duration.between(now, mEndTime); + mDone = true; + } + mSleeper.sleep(nextWaitTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + mRetryCount++; + return true; + } + + /** + * @return how long to wait before the next retry + */ + protected abstract Duration computeNextWaitTime(); +} diff --git a/core/common/src/main/java/alluxio/time/Sleeper.java b/core/common/src/main/java/alluxio/time/Sleeper.java index cd8c494189c4..cc972c7cd97e 100644 --- a/core/common/src/main/java/alluxio/time/Sleeper.java +++ b/core/common/src/main/java/alluxio/time/Sleeper.java @@ -11,16 +11,18 @@ package alluxio.time; +import java.time.Duration; + /** * An interface for a utility which provides a sleep method. */ public interface Sleeper { /** - * Sleeps for the given number of milliseconds. + * Sleeps for the given duration. * - * @param millis the number of milliseconds to sleep for + * @param duration the duration to sleep for * @throws InterruptedException if the sleep is interrupted */ - void sleep(long millis) throws InterruptedException; + void sleep(Duration duration) throws InterruptedException; } diff --git a/core/common/src/main/java/alluxio/time/ThreadSleeper.java b/core/common/src/main/java/alluxio/time/ThreadSleeper.java index 7b9e4b57de45..f7a66b4c8053 100644 --- a/core/common/src/main/java/alluxio/time/ThreadSleeper.java +++ b/core/common/src/main/java/alluxio/time/ThreadSleeper.java @@ -11,6 +11,8 @@ package alluxio.time; +import java.time.Duration; + /** * A sleeping utility which delegates to Thread.sleep(). */ @@ -22,7 +24,7 @@ public class ThreadSleeper implements Sleeper { public ThreadSleeper() {} @Override - public void sleep(long millis) throws InterruptedException { - Thread.sleep(millis); + public void sleep(Duration duration) throws InterruptedException { + Thread.sleep(duration.toMillis()); } } diff --git a/core/common/src/main/java/alluxio/time/TimeContext.java b/core/common/src/main/java/alluxio/time/TimeContext.java new file mode 100644 index 000000000000..107c44ece804 --- /dev/null +++ b/core/common/src/main/java/alluxio/time/TimeContext.java @@ -0,0 +1,47 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.time; + +import java.time.Clock; + +/** + * Context for managing time. + */ +public final class TimeContext { + public static final TimeContext SYSTEM = new TimeContext(Clock.systemUTC(), new ThreadSleeper()); + + private final Clock mClock; + private final Sleeper mSleeper; + + /** + * @param clock the clock for this context + * @param sleeper the sleeper for this context + */ + public TimeContext(Clock clock, Sleeper sleeper) { + mClock = clock; + mSleeper = sleeper; + } + + /** + * @return the clock for this context + */ + public Clock getClock() { + return mClock; + } + + /** + * @return the sleeper for thix context + */ + public Sleeper getSleeper() { + return mSleeper; + } +} diff --git a/core/common/src/test/java/alluxio/clock/ManualClock.java b/core/common/src/test/java/alluxio/clock/ManualClock.java index e0b82e8f6a1b..57474cd1e9c5 100644 --- a/core/common/src/test/java/alluxio/clock/ManualClock.java +++ b/core/common/src/test/java/alluxio/clock/ManualClock.java @@ -12,8 +12,10 @@ package alluxio.clock; import java.time.Clock; +import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.time.ZoneOffset; /** * A manually set clock useful for testing. @@ -55,6 +57,15 @@ public synchronized void addTimeMs(long timeMs) { mTimeMs += timeMs; } + /** + * Moves the clock forward the specified duration. + * + * @param time the duration to add + */ + public synchronized void addTime(Duration time) { + mTimeMs += time.toMillis(); + } + @Override public synchronized long millis() { return mTimeMs; @@ -62,16 +73,16 @@ public synchronized long millis() { @Override public ZoneId getZone() { - return null; + return ZoneOffset.UTC; } @Override public Clock withZone(ZoneId zone) { - return null; + throw new UnsupportedOperationException("ManualClock only uses UTC"); } @Override public Instant instant() { - return null; + return Instant.ofEpochMilli(millis()); } } diff --git a/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java b/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java index 81780a5941d5..30a46b7ecf8c 100644 --- a/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java +++ b/core/common/src/test/java/alluxio/heartbeat/SleepingTimerTest.java @@ -19,6 +19,8 @@ import org.mockito.Mockito; import org.slf4j.Logger; +import java.time.Duration; + /** * Unit tests for {@link SleepingTimer}. */ @@ -54,9 +56,9 @@ public void sleepForSpecifiedInterval() throws Exception { final SleepingTimer timer = new SleepingTimer(THREAD_NAME, INTERVAL_MS, mMockLogger, mFakeClock, mMockSleeper); timer.tick(); // first tick won't sleep - Mockito.verify(mMockSleeper, Mockito.times(0)).sleep(Mockito.anyLong()); + Mockito.verify(mMockSleeper, Mockito.times(0)).sleep(Mockito.any(Duration.class)); timer.tick(); - Mockito.verify(mMockSleeper).sleep(INTERVAL_MS); + Mockito.verify(mMockSleeper).sleep(Duration.ofMillis(INTERVAL_MS)); } /** @@ -72,6 +74,6 @@ public void maintainInterval() throws Exception { stimer.tick(); mFakeClock.addTimeMs(INTERVAL_MS / 3); stimer.tick(); - Mockito.verify(mMockSleeper).sleep(INTERVAL_MS - (INTERVAL_MS / 3)); + Mockito.verify(mMockSleeper).sleep(Duration.ofMillis(INTERVAL_MS - (INTERVAL_MS / 3))); } } diff --git a/core/common/src/test/java/alluxio/retry/ExponentialTimeBoundedRetryTest.java b/core/common/src/test/java/alluxio/retry/ExponentialTimeBoundedRetryTest.java new file mode 100644 index 000000000000..10a1549055ff --- /dev/null +++ b/core/common/src/test/java/alluxio/retry/ExponentialTimeBoundedRetryTest.java @@ -0,0 +1,86 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.retry; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertThat; + +import alluxio.Constants; +import alluxio.clock.ManualClock; +import alluxio.time.ManualSleeper; +import alluxio.time.TimeContext; +import alluxio.util.CommonUtils; + +import org.junit.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Iterator; + +/** + * Unit tests for {@link ExponentialTimeBoundedRetry}. + */ +public final class ExponentialTimeBoundedRetryTest { + @Test + public void exponentialBackoff() throws InterruptedException { + // Run the test multiple times to cover more cases due to randomness of jitter. This can be + // cranked up for debugging purposes. We keep it low to avoid taking too much test time. + for (int i = 0; i < 2; i++) { + ManualClock clock = new ManualClock(); + ManualSleeper sleeper = new ManualSleeper(); + long maxDurationMs = 500; + long taskTimeMs = 20; + ExponentialTimeBoundedRetry retry = ExponentialTimeBoundedRetry.builder() + .withTimeCtx(new TimeContext(clock, sleeper)) + .withMaxDuration(Duration.ofMillis(maxDurationMs)) + .withInitialSleep(Duration.ofMillis(10)) + .withMaxSleep(Duration.ofMillis(100)) + .build(); + + Thread thread = new Thread(() -> { + do { + CommonUtils.sleepMs(taskTimeMs); + clock.addTimeMs(taskTimeMs); + } while (retry.attemptRetry()); + }); + thread.setDaemon(true); + thread.start(); + thread.setName("time-bounded-exponential-backoff-test"); + + long timeRemainingMs = maxDurationMs - taskTimeMs; + Iterator expectedBaseTimes = Arrays.asList(10L, 20L, 40L, 80L, 100L).iterator(); + long expectedTime = expectedBaseTimes.next(); + while (timeRemainingMs > 0) { + Duration actualSleep = sleeper.waitForSleep(); + // Account for 10% added jitter. + checkBetween(expectedTime, expectedTime * 1.1, actualSleep); + timeRemainingMs -= actualSleep.toMillis() + taskTimeMs; + clock.addTime(actualSleep); + sleeper.wakeUp(); + if (expectedBaseTimes.hasNext()) { + expectedTime = expectedBaseTimes.next(); + } + if (timeRemainingMs < 100) { + expectedTime = timeRemainingMs; + } + } + thread.interrupt(); + thread.join(10 * Constants.SECOND_MS); + } + } + + private void checkBetween(double start, double end, Duration time) { + assertThat((double) time.toMillis(), greaterThanOrEqualTo(start)); + assertThat((double) time.toMillis(), lessThanOrEqualTo(end)); + } +} diff --git a/core/common/src/test/java/alluxio/retry/RetryUtilsTest.java b/core/common/src/test/java/alluxio/retry/RetryUtilsTest.java new file mode 100644 index 000000000000..739c9c3092fd --- /dev/null +++ b/core/common/src/test/java/alluxio/retry/RetryUtilsTest.java @@ -0,0 +1,53 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.retry; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Unit tests for {@link RetryUtils}. + */ +public class RetryUtilsTest { + @Test + public void success() throws IOException { + AtomicInteger count = new AtomicInteger(0); + RetryUtils.retry("success test", () -> { + count.incrementAndGet(); + if (count.get() == 5) { + return; + } + throw new IOException("Fail"); + }, new CountingRetry(10)); + assertEquals(5, count.get()); + } + + @Test + public void failure() throws IOException { + AtomicInteger count = new AtomicInteger(0); + try { + RetryUtils.retry("failure test", () -> { + count.incrementAndGet(); + throw new IOException(Integer.toString(count.get())); + }, new CountingRetry(10)); + fail("Expected an exception to be thrown"); + } catch (IOException e) { + assertEquals("11", e.getMessage()); + } + assertEquals(11, count.get()); + } +} diff --git a/core/common/src/test/java/alluxio/time/ManualSleeper.java b/core/common/src/test/java/alluxio/time/ManualSleeper.java new file mode 100644 index 000000000000..6db0f7655334 --- /dev/null +++ b/core/common/src/test/java/alluxio/time/ManualSleeper.java @@ -0,0 +1,96 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.time; + +import com.google.common.base.Preconditions; + +import java.time.Duration; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Sleeper which can be manually controlled by tests. This allows time-related classes to be tested + * deterministically, without relying on polling or calls to Thread.sleep. + * + * To use the class, pass an instance of ManualSleeper into the class to be tested, then use the + * {@link #waitForSleep()} method to verify that the class is sleeping for the right amount of time. + * After calling {@link #waitForSleep()}, use {@link #wakeUp()} to end the test class's call to + * {@link #sleep(Duration)}. + * + * See {@link ManualSleeperTest} for example usage. + */ +public class ManualSleeper implements Sleeper { + private final Lock mSleepLock = new ReentrantLock(); + private final Condition mSleepLockCond = mSleepLock.newCondition(); + + private Duration mLastSleep = Duration.ZERO; + private boolean mSleeping = false; + + @Override + public void sleep(Duration duration) throws InterruptedException { + mSleepLock.lock(); + mLastSleep = duration; + mSleeping = true; + mSleepLockCond.signalAll(); + try { + while (mSleeping) { + mSleepLockCond.await(); + } + } finally { + mSleeping = false; // handles the case where await() is interrupted + mSleepLock.unlock(); + } + } + + /** + * @return whether the sleeper is currently sleeping + */ + public boolean sleeping() { + mSleepLock.lock(); + try { + return mSleeping; + } finally { + mSleepLock.unlock(); + } + } + + /** + * Waits for the sleeper to be in a sleeping state and returns the length of time it is sleeping + * for. + */ + public Duration waitForSleep() throws InterruptedException { + mSleepLock.lock(); + try { + while (!mSleeping) { + mSleepLockCond.await(); + } + return mLastSleep; + } finally { + mSleepLock.unlock(); + } + } + + /** + * Wakes up from the current call to sleep. + */ + public void wakeUp() { + mSleepLock.lock(); + Preconditions.checkState(mSleeping, "Called wakeUp when nothing was sleeping"); + try { + mSleeping = false; + mSleepLockCond.signal(); + } finally { + mSleepLock.unlock(); + } + } +} diff --git a/core/common/src/test/java/alluxio/time/ManualSleeperTest.java b/core/common/src/test/java/alluxio/time/ManualSleeperTest.java new file mode 100644 index 000000000000..da71fc59e30f --- /dev/null +++ b/core/common/src/test/java/alluxio/time/ManualSleeperTest.java @@ -0,0 +1,71 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.time; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import alluxio.Constants; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Unit tests for {@link ManualSleeper}. + */ +public final class ManualSleeperTest { + private AtomicInteger mSleepTime; + private ManualSleeper mSleeper; + private Thread mTestThread; + + @Before + public void before() { + mSleepTime = new AtomicInteger(0); + mSleeper = new ManualSleeper(); + mTestThread = new Thread(() -> { + while (true) { + try { + mSleeper.sleep(Duration.ofMillis(mSleepTime.incrementAndGet())); + } catch (InterruptedException e) { + return; + } + } + }); + mTestThread.setDaemon(true); + mTestThread.start(); + } + + @After + public void after() throws InterruptedException { + mTestThread.interrupt(); + mTestThread.join(Constants.SECOND_MS); + } + + @Test + public void checkSleepTime() throws InterruptedException { + for (int i = 1; i < 100; i++) { + assertEquals(i, mSleeper.waitForSleep().toMillis()); + mSleeper.wakeUp(); + } + } + + @Test + public void propagateInterrupt() throws InterruptedException { + mTestThread.interrupt(); + mTestThread.join(Constants.SECOND_MS); + assertFalse(mTestThread.isAlive()); + } +} diff --git a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java index 210a9d49ee63..906b18c9221f 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java @@ -27,6 +27,8 @@ import alluxio.master.MasterClientConfig; import alluxio.metrics.MetricsSystem; import alluxio.proto.dataserver.Protocol; +import alluxio.retry.RetryUtils; +import alluxio.retry.ExponentialTimeBoundedRetry; import alluxio.thrift.BlockWorkerClientService; import alluxio.underfs.UfsManager; import alluxio.util.CommonUtils; @@ -50,6 +52,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -197,7 +200,13 @@ public AtomicReference getWorkerId() { public void start(WorkerNetAddress address) throws IOException { mAddress = address; try { - mWorkerId.set(mBlockMasterClient.getId(address)); + RetryUtils.retry("get worker id", () -> mWorkerId.set(mBlockMasterClient.getId(address)), + ExponentialTimeBoundedRetry.builder() + .withMaxDuration(Duration + .ofMillis(Configuration.getMs(PropertyKey.WORKER_MASTER_CONNECT_RETRY_TIMEOUT))) + .withInitialSleep(Duration.ofMillis(100)) + .withMaxSleep(Duration.ofSeconds(5)) + .build()); } catch (Exception e) { throw new RuntimeException("Failed to get a worker id from block master: " + e.getMessage()); } diff --git a/pom.xml b/pom.xml index b70ed48b9bd0..470231ff21e1 100644 --- a/pom.xml +++ b/pom.xml @@ -643,6 +643,12 @@ 1.3 test + + org.hamcrest + hamcrest-all + 1.3 + test + org.mockito mockito-all @@ -737,6 +743,11 @@ junit test + + org.hamcrest + hamcrest-all + test + org.mockito mockito-all