Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ public static final RetryPolicy failoverOnNetworkException(
maxRetries, delayMillis, maxDelayBase);
}

public static final RetryPolicy failoverOnNetworkException(
RetryPolicy fallbackPolicy, int maxFailovers, int maxRetries,
long delayMillis, long maxDelayBase, int nnSize) {
return new FailoverOnNetworkExceptionRetry(fallbackPolicy, maxFailovers,
maxRetries, delayMillis, maxDelayBase, nnSize);
}

static class TryOnceThenFail implements RetryPolicy {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
Expand Down Expand Up @@ -620,12 +627,13 @@ protected long calculateSleepTime(int retries) {
* Fall back on underlying retry policy otherwise.
*/
static class FailoverOnNetworkExceptionRetry implements RetryPolicy {

private static final int minNnSize = 2;
private RetryPolicy fallbackPolicy;
private int maxFailovers;
private int maxRetries;
private long delayMillis;
private long maxDelayBase;
private int nnSize;

public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
int maxFailovers) {
Expand All @@ -639,19 +647,26 @@ public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,

public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
int maxFailovers, int maxRetries, long delayMillis, long maxDelayBase) {
this(fallbackPolicy, maxFailovers, maxRetries, delayMillis, maxDelayBase, minNnSize);
}

public FailoverOnNetworkExceptionRetry(RetryPolicy fallbackPolicy,
int maxFailovers, int maxRetries, long delayMillis, long maxDelayBase, int nnSize) {
this.fallbackPolicy = fallbackPolicy;
this.maxFailovers = maxFailovers;
this.maxRetries = maxRetries;
this.delayMillis = delayMillis;
this.maxDelayBase = maxDelayBase;
// set the nn size to reduce the failover sleep time.
this.nnSize = nnSize;
}

/**
* @return 0 if this is our first failover/retry (i.e., retry immediately),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments here looks need to be updated too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cndaimin I updated the code. Please take a look.

* sleep exponentially otherwise
*/
private long getFailoverOrRetrySleepTime(int times) {
return times == 0 ? 0 :
return times < (nnSize - 1) ? 0 :
calculateExponentialTime(delayMillis, times, maxDelayBase);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,16 @@ public static <T> ProxyAndInfo<T> createHAProxy(
Configuration conf, URI nameNodeUri, Class<T> xface,
AbstractNNFailoverProxyProvider<T> failoverProxyProvider) {
Preconditions.checkNotNull(failoverProxyProvider);
Map<String, Map<String, InetSocketAddress>> map =
DFSUtilClient.getAddresses(conf, null, HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
Map<String, InetSocketAddress> nnMap = map.get(nameNodeUri.getHost());
// HA case
DfsClientConf config = new DfsClientConf(conf);
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
config.getFailoverSleepMaxMillis()));
config.getFailoverSleepMaxMillis(), nnMap.size()));

Text dtService;
if (failoverProxyProvider.useLogicalURI()) {
Expand Down
Loading