Skip to content

Commit

Permalink
[SMALLFIX] Retry master address lookup (#6886)
Browse files Browse the repository at this point in the history
* Get master address inside retry loop

* Add retries around block worker registration

* Fix checkstyle

* Address review comments

* Address review comments
  • Loading branch information
aaudiber committed Feb 19, 2018
1 parent 4b6be96 commit e45975a
Show file tree
Hide file tree
Showing 19 changed files with 724 additions and 28 deletions.
32 changes: 22 additions & 10 deletions core/common/src/main/java/alluxio/AbstractClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import alluxio.exception.status.Status;
import alluxio.exception.status.UnavailableException;
import alluxio.exception.status.UnimplementedException;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.retry.RetryPolicy;
import alluxio.retry.ExponentialTimeBoundedRetry;
import alluxio.security.authentication.TransportProvider;
import alluxio.thrift.AlluxioService;
import alluxio.thrift.AlluxioTException;
Expand All @@ -36,6 +36,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.regex.Pattern;

import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -53,10 +54,12 @@ public abstract class AbstractClient implements Client {
private static final Pattern FRAME_SIZE_EXCEPTION_PATTERN =
Pattern.compile("Frame size \\((\\d+)\\) larger than max length");

private static final int BASE_SLEEP_MS =
(int) Configuration.getMs(PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS);
private static final int MAX_SLEEP_MS =
(int) Configuration.getMs(PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS);
private static final Duration MAX_RETRY_DURATION =
Configuration.getDuration(PropertyKey.USER_RPC_RETRY_MAX_DURATION);
private static final Duration BASE_SLEEP_MS =
Configuration.getDuration(PropertyKey.USER_RPC_RETRY_BASE_SLEEP_MS);
private static final Duration MAX_SLEEP_MS =
Configuration.getDuration(PropertyKey.USER_RPC_RETRY_MAX_SLEEP_MS);

/** The number of times to retry a particular RPC. */
protected static final int RPC_MAX_NUM_RETRY =
Expand Down Expand Up @@ -167,16 +170,24 @@ public synchronized void connect() throws AlluxioStatusException {
Preconditions.checkState(!mClosed, "Client is closed, will not try to connect.");

RetryPolicy retryPolicy =
new ExponentialBackoffRetry(BASE_SLEEP_MS, MAX_SLEEP_MS, RPC_MAX_NUM_RETRY);
ExponentialTimeBoundedRetry.builder().withMaxDuration(MAX_RETRY_DURATION)
.withInitialSleep(BASE_SLEEP_MS).withMaxSleep(MAX_SLEEP_MS).build();
while (true) {
if (mClosed) {
throw new FailedPreconditionException("Failed to connect: client has been closed");
}
// Re-query the address in each loop iteration in case it has changed (e.g. master failover).
mAddress = getAddress();
// Re-query the address in each loop iteration in case it has changed (e.g. master
// failover).
try {
mAddress = getAddress();
} catch (UnavailableException e) {
if (!retryPolicy.attemptRetry()) {
break;
}
continue;
}
LOG.info("Alluxio client (version {}) is trying to connect with {} @ {}",
RuntimeConstants.VERSION, getServiceName(), mAddress);

TProtocol binaryProtocol =
new TBinaryProtocol(mTransportProvider.getClientTransport(mParentSubject, mAddress));
mProtocol = new TMultiplexedProtocol(binaryProtocol, getServiceName());
Expand Down Expand Up @@ -282,7 +293,8 @@ protected interface RpcCallable<V> {
*/
protected synchronized <V> V retryRPC(RpcCallable<V> rpc) throws AlluxioStatusException {
RetryPolicy retryPolicy =
new ExponentialBackoffRetry(BASE_SLEEP_MS, MAX_SLEEP_MS, RPC_MAX_NUM_RETRY);
ExponentialTimeBoundedRetry.builder().withMaxDuration(MAX_RETRY_DURATION)
.withInitialSleep(BASE_SLEEP_MS).withMaxSleep(MAX_SLEEP_MS).build();
while (!mClosed) {
Exception ex;
connect();
Expand Down
11 changes: 11 additions & 0 deletions core/common/src/main/java/alluxio/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -403,6 +404,16 @@ public static long getMs(PropertyKey key) {
}
}

/**
* Gets the time of the key as a duration.
*
* @param key the key to get the value for
* @return the value of the key represented as a duration
*/
public static Duration getDuration(PropertyKey key) {
return Duration.ofMillis(getMs(key));
}

/**
* Gets the value for the given key as a class.
*
Expand Down
19 changes: 19 additions & 0 deletions core/common/src/main/java/alluxio/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,14 @@ public String toString() {
.setDescription("Netty socket option for SO_SNDBUF: the proposed buffer size that will "
+ "be used for sends.")
.build();
public static final PropertyKey WORKER_MASTER_CONNECT_RETRY_TIMEOUT =
new Builder(Name.WORKER_MASTER_CONNECT_RETRY_TIMEOUT)
.setDescription("Retry period before workers give up on connecting to master")
.setDefaultValue("1hour")
// Leaving this hidden for now until we sort out how it should interact with
// WORKER_BLOCK_HEARTBEAT_TIMEOUT_MS.
.setIsHidden(true)
.build();
public static final PropertyKey WORKER_NETWORK_NETTY_CHANNEL =
new Builder(Name.WORKER_NETWORK_NETTY_CHANNEL)
.setDescription("Netty channel type: NIO or EPOLL.")
Expand Down Expand Up @@ -1758,6 +1766,13 @@ public String toString() {
+ "an exponential backoff. This property determines the base time "
+ "in the exponential backoff.")
.build();
public static final PropertyKey USER_RPC_RETRY_MAX_DURATION =
new Builder(Name.USER_RPC_RETRY_MAX_DURATION)
.setDefaultValue("2min")
.setDescription("Alluxio client RPCs automatically retry for transient errors with "
+ "an exponential backoff. This property determines the maximum duration to retry for"
+ " before giving up.")
.build();
public static final PropertyKey USER_RPC_RETRY_MAX_NUM_RETRY =
new Builder(Name.USER_RPC_RETRY_MAX_NUM_RETRY)
.setDefaultValue(100)
Expand Down Expand Up @@ -2315,6 +2330,8 @@ public static final class Name {
"alluxio.worker.network.netty.buffer.receive";
public static final String WORKER_NETWORK_NETTY_BUFFER_SEND =
"alluxio.worker.network.netty.buffer.send";
public static final String WORKER_MASTER_CONNECT_RETRY_TIMEOUT =
"alluxio.worker.master.connect.retry.timeout";
public static final String WORKER_NETWORK_NETTY_CHANNEL =
"alluxio.worker.network.netty.channel";
public static final String WORKER_NETWORK_NETTY_FILE_TRANSFER_TYPE =
Expand Down Expand Up @@ -2484,6 +2501,8 @@ public static final class Name {
"alluxio.user.network.netty.reader.packet.size.bytes";
public static final String USER_RPC_RETRY_BASE_SLEEP_MS =
"alluxio.user.rpc.retry.base.sleep";
public static final String USER_RPC_RETRY_MAX_DURATION =
"alluxio.user.rpc.retry.max.duration";
public static final String USER_RPC_RETRY_MAX_NUM_RETRY =
"alluxio.user.rpc.retry.max.num.retry";
public static final String USER_RPC_RETRY_MAX_SLEEP_MS = "alluxio.user.rpc.retry.max.sleep";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;

import java.time.Clock;
import java.time.Duration;

import javax.annotation.concurrent.NotThreadSafe;

/**
* This class can be used for executing heartbeats periodically.
Expand Down Expand Up @@ -75,7 +76,7 @@ public void tick() throws InterruptedException {
mLogger.warn("{} last execution took {} ms. Longer than the interval {}", mThreadName,
executionTimeMs, mIntervalMs);
} else {
mSleeper.sleep(mIntervalMs - executionTimeMs);
mSleeper.sleep(Duration.ofMillis(mIntervalMs - executionTimeMs));
}
}
mPreviousTickMs = mClock.millis();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.retry;

import alluxio.time.TimeContext;

import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;

/**
* A retry policy which uses exponential backoff and a maximum duration time bound.
*
* A final retry will be performed at the time bound before giving up.
*
* For example, with initial sleep 10ms, maximum sleep 100ms, and maximum duration 500ms, the sleep
* timings would be [10, 20, 40, 80, 100, 100, 100, 50], assuming the operation being retries takes
* no time. The 50 at the end is because the previous times add up to 450, so the mechanism sleeps
* for only 50ms before the final attempt.
*
* However, those are just the base sleep timings. For each sleep time, we multiply by a random
* number from 1 to 1.1 to add jitter to avoid hotspotting.
*/
public final class ExponentialTimeBoundedRetry extends TimeBoundedRetry {
private final Duration mMaxSleep;
private Duration mNextSleep;

/**
* See {@link Builder}.
*/
private ExponentialTimeBoundedRetry(TimeContext timeCtx, Duration maxDuration,
Duration initialSleep, Duration maxSleep) {
super(timeCtx, maxDuration);
mMaxSleep = maxSleep;
mNextSleep = initialSleep;
}

@Override
protected Duration computeNextWaitTime() {
Duration next = mNextSleep;
mNextSleep = mNextSleep.multipliedBy(2);
if (mNextSleep.compareTo(mMaxSleep) > 0) {
mNextSleep = mMaxSleep;
}
// Add jitter.
long jitter = Math.round(ThreadLocalRandom.current().nextDouble(0.1) * next.toMillis());
return next.plusMillis(jitter);
}

/**
* @return a builder
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for time bounded exponential retry mechanisms.
*/
public static class Builder {
private TimeContext mTimeCtx = TimeContext.SYSTEM;
private Duration mMaxDuration;
private Duration mInitialSleep;
private Duration mMaxSleep;

/**
* @param timeCtx time context
* @return the builder
*/
public Builder withTimeCtx(TimeContext timeCtx) {
mTimeCtx = timeCtx;
return this;
}

/**
* @param maxDuration max total duration to retry for
* @return the builder
*/
public Builder withMaxDuration(Duration maxDuration) {
mMaxDuration = maxDuration;
return this;
}

/**
* @param initialSleep initial sleep interval between retries
* @return the builder
*/
public Builder withInitialSleep(Duration initialSleep) {
mInitialSleep = initialSleep;
return this;
}

/**
* @param maxSleep maximum sleep interval between retries
* @return the builder
*/
public Builder withMaxSleep(Duration maxSleep) {
mMaxSleep = maxSleep;
return this;
}

/**
* @return the built retry mechanism
*/
public ExponentialTimeBoundedRetry build() {
return new ExponentialTimeBoundedRetry(mTimeCtx, mMaxDuration, mInitialSleep, mMaxSleep);
}
}
}
8 changes: 5 additions & 3 deletions core/common/src/main/java/alluxio/retry/RetryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import javax.annotation.concurrent.NotThreadSafe;

/**
* Attempts to retry code from a do/while loop. The way that this interface works is that the logic
* Policy for determining whether retries should be performed, and potentially waiting for some time
* before the next retry attempt. The way that this interface works is that the logic
* for delayed retries (retries that sleep) can delay the caller of {@link #attemptRetry()}. Because
* of this, its best to put retries in do/while loops to avoid the first wait.
*/
Expand All @@ -29,9 +30,10 @@ public interface RetryPolicy {
int getRetryCount();

/**
* Attempts to run the given operation, returning false if unable to (max retries have happened).
* Waits until it is time to perform the next retry, then returns. Returns false if no further
* retries should be performed.
*
* @return whether the operation have succeeded or failed (max retries have happened)
* @return whether another retry should be performed
*/
boolean attemptRetry();
}
60 changes: 60 additions & 0 deletions core/common/src/main/java/alluxio/retry/RetryUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.retry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Utilities for performing retries.
*/
public final class RetryUtils {
private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);

/**
* Retries the given method until it doesn't throw an IO exception or the retry policy expires. If
* the retry policy expires, the last exception generated will be rethrown.
*
* @param action a description of the action that fits the phrase "Failed to ${action}"
* @param f the function to retry
* @param policy the retry policy to use
*/
public static void retry(String action, RunnableThrowsIOException f, RetryPolicy policy)
throws IOException {
IOException e;
do {
try {
f.run();
return;
} catch (IOException ioe) {
e = ioe;
LOG.warn("Failed to {} (attempt {}): {}", action, policy.getRetryCount() + 1, e.toString());
}
} while (policy.attemptRetry());
throw e;
}

/**
* Interface for methods which return nothing and may throw IOException.
*/
@FunctionalInterface
public interface RunnableThrowsIOException {
/**
* Runs the runnable.
*/
void run() throws IOException;
}

private RetryUtils() {} // prevent instantiation
}
Loading

0 comments on commit e45975a

Please sign in to comment.