Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -62,6 +61,7 @@
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.FetchBlockLocationsRetryer;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
Expand Down Expand Up @@ -996,10 +996,12 @@ private DNAddrPair chooseDataNode(LocatedBlock block,

private LocatedBlock refetchLocations(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
FetchBlockLocationsRetryer retryer = dfsClient.getConf().getFetchBlockLocationsRetryer();

String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
dfsClient.getDeadNodes(this), ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
if (retryer.isMaxFailuresExceeded(failures)) {
String description = "Could not obtain block: " + blockInfo;
DFSClient.LOG.warn(description + errMsg
+ ". Throwing a BlockMissingException");
Expand All @@ -1015,21 +1017,7 @@ private LocatedBlock refetchLocations(LocatedBlock block,
+ " from any node: " + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
// The wait time is dependent on # of failures and a random factor.
// At the first time of getting a BlockMissingException, the wait time
// is a random number between 0..3000 ms. If the first retry
// still fails, we will wait 3000 ms grace period before the 2nd retry.
// Also at the second retry, the waiting window is expanded to 6000 ms
// alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms.
final int timeWindow = dfsClient.getConf().getTimeWindow();
// grace period for the last round of attempt
double waitTime = timeWindow * failures +
// expanding time window for each failure
timeWindow * (failures + 1) *
ThreadLocalRandom.current().nextDouble();
double waitTime = retryer.getWaitTime(failures);
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
" IOException, will wait for " + waitTime + " msec.");
Thread.sleep((long)waitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ interface Retry {

String WINDOW_BASE_KEY = PREFIX + "window.base";
int WINDOW_BASE_DEFAULT = 3000;

String WINDOW_MULTIPLIER_KEY = PREFIX + "window.multiplier";
int WINDOW_MULTIPLIER_DEFAULT = 1;

String WINDOW_MAXIMUM_KEY = PREFIX + "window.max";
int WINDOW_MAXIMUM_DEFAULT = 30000;
}

/** dfs.client.failover configuration properties */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
Expand Down Expand Up @@ -112,7 +113,7 @@ public class DfsClientConf {
private final int maxPipelineRecoveryRetries;
private final int failoverSleepBaseMillis;
private final int failoverSleepMaxMillis;
private final int maxBlockAcquireFailures;
private final FetchBlockLocationsRetryer fetchBlockLocationsRetryer;
private final int datanodeSocketWriteTimeout;
private final int ioBufferSize;
private final ChecksumOpt defaultChecksumOpt;
Expand All @@ -124,8 +125,6 @@ public class DfsClientConf {
private final int socketTimeout;
private final int socketSendBufferSize;
private final long excludedNodesCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught. */
private final int timeWindow;
private final int numCachedConnRetry;
private final int numBlockWriteRetry;
private final int numBlockWriteLocateFollowingRetry;
Expand Down Expand Up @@ -170,9 +169,6 @@ public DfsClientConf(Configuration conf) {
maxRetryAttempts = conf.getInt(
Retry.MAX_ATTEMPTS_KEY,
Retry.MAX_ATTEMPTS_DEFAULT);
timeWindow = conf.getInt(
Retry.WINDOW_BASE_KEY,
Retry.WINDOW_BASE_DEFAULT);
retryTimesForGetLastBlockLength = conf.getInt(
Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY,
Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
Expand All @@ -190,9 +186,8 @@ public DfsClientConf(Configuration conf) {
Failover.SLEEPTIME_MAX_KEY,
Failover.SLEEPTIME_MAX_DEFAULT);

maxBlockAcquireFailures = conf.getInt(
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
fetchBlockLocationsRetryer = new FetchBlockLocationsRetryer(conf);

datanodeSocketWriteTimeout = conf.getInt(
DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
HdfsConstants.WRITE_TIMEOUT);
Expand Down Expand Up @@ -448,13 +443,6 @@ public int getFailoverSleepMaxMillis() {
return failoverSleepMaxMillis;
}

/**
* @return the maxBlockAcquireFailures
*/
public int getMaxBlockAcquireFailures() {
return maxBlockAcquireFailures;
}

/**
* @return the datanodeSocketWriteTimeout
*/
Expand Down Expand Up @@ -540,10 +528,11 @@ public long getExcludedNodesCacheExpiry() {
}

/**
* @return the timeWindow
*
* @return the fetchBlockLocationsRetryer
*/
public int getTimeWindow() {
return timeWindow;
public FetchBlockLocationsRetryer getFetchBlockLocationsRetryer() {
return fetchBlockLocationsRetryer;
}

/**
Expand Down Expand Up @@ -994,4 +983,98 @@ public String confAsString() {
+ domainSocketDisableIntervalSeconds;
}
}

/**
* Handles calculating the wait time when BlockMissingException is caught.
*/
public static class FetchBlockLocationsRetryer {
private final int maxBlockAcquireFailures;
private final int timeWindowBase;
private final int timeWindowMultiplier;
private final int timeWindowMax;
private final boolean enableRandom;

public FetchBlockLocationsRetryer(Configuration conf) {
this(conf, true);
}

/**
* It helps for testing to be able to disable the random factor. It should remain
* enabled for non-test use
*/
@VisibleForTesting
FetchBlockLocationsRetryer(Configuration conf, boolean enableRandom) {
maxBlockAcquireFailures = conf.getInt(
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT);
timeWindowBase = conf.getInt(
Retry.WINDOW_BASE_KEY,
Retry.WINDOW_BASE_DEFAULT);
timeWindowMultiplier = conf.getInt(
Retry.WINDOW_MULTIPLIER_KEY,
Retry.WINDOW_MULTIPLIER_DEFAULT);
timeWindowMax = conf.getInt(
Retry.WINDOW_MAXIMUM_KEY,
Retry.WINDOW_MAXIMUM_DEFAULT
);
this.enableRandom = enableRandom;
}

/**
* For tests, exposes the maximum allowed failures.
*/
@VisibleForTesting
public int getMaxBlockAcquireFailures() {
return maxBlockAcquireFailures;
}

/**
* Returns whether the passed number of failures is greater or equal to the maximum
* allowed failures.
*/
public boolean isMaxFailuresExceeded(int numFailures) {
return numFailures >= maxBlockAcquireFailures;
}

/**
* The wait time is calculated using a grace period, a time window, and a
* random factor applied to that time window. With each subsequent failure,
* the grace period expands to the maximum value of the previous time window,
* and the time window upper limit expands by a constant exponential multiplier.
* The first retry has a grace period of 0ms.
*
* With default settings, the first failure will result in a wait time of a
* random number between 0 and 3000ms. The second failure will have a grace
* period of 3000ms, and an additional wait time of a random number between 0 and
* 6000ms. Subsequent failures will expand to 6000ms grace period and 0 - 9000ms,
* then 9000ms grace and 0 - 12000ms, etc.
*
* This behavior can be made more and less aggressive by configuring the base
* value (default 3000ms) and constant exponential multiplier (default 1). For
* example, a base of 10 and multiplier 5 could result in one very fast retry that
* quickly backs off in case of multiple failures. This may be useful for low
* latency applications. One downside with high multipliers is how quickly the
* backoff can get to very high numbers. One can further customize this by setting
* a maximum window size to cap.
*/
public double getWaitTime(int numFailures) {
double gracePeriod = backoff(numFailures);
double waitTimeWithRandomFactor = backoff(numFailures + 1) * getRandomFactor();

return gracePeriod + waitTimeWithRandomFactor;
}

private double backoff(int failures) {
double window = timeWindowBase * Math.pow(timeWindowMultiplier, failures) * failures;
return Math.min(window, timeWindowMax);
}

private double getRandomFactor() {
if (enableRandom) {
return ThreadLocalRandom.current().nextDouble();
} else {
return 1;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
@Deprecated
public static final int DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT
= HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_RETRY_WINDOW_MULTIPLIER
= HdfsClientConfigKeys.Retry.WINDOW_MULTIPLIER_KEY;
@Deprecated
public static final int DFS_CLIENT_RETRY_WINDOW_MULTIPLIER_DEFAULT
= HdfsClientConfigKeys.Retry.WINDOW_MULTIPLIER_DEFAULT;
@Deprecated
public static final String DFS_CLIENT_RETRY_WINDOW_MAXIMUM
= HdfsClientConfigKeys.Retry.WINDOW_MAXIMUM_KEY;
@Deprecated
public static final int DFS_CLIENT_RETRY_WINDOW_MAXIMUM_DEFAULT
= HdfsClientConfigKeys.Retry.WINDOW_MAXIMUM_DEFAULT;

// dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4440,8 +4440,28 @@
<value>3000</value>
<description>
Base time window in ms for DFSClient retries. For each retry attempt,
this value is extended linearly (e.g. 3000 ms for first attempt and
first retry, 6000 ms for second retry, 9000 ms for third retry, etc.).
this value is extended exponentially based on dfs.client.retry.window.multiplier.
</description>
</property>

<property>
<name>dfs.client.retry.window.multiplier</name>
<value>1</value>
<description>
Multiplier for extending the retry time window. For each retry attempt,
the retry time window is extended by multiplying dfs.client.retry.window.base
by this multiplier raised to the power of the current failure count. The default
value of 1 means the window will expand linearly (e.g. 3000 ms for first attempt
and first retry, 6000 ms for second retry, 9000 ms for third retry, etc.).
</description>
</property>

<property>
<name>dfs.client.retry.window.max</name>
<value>30000</value>
<description>
Maximum value that the retry window can be expanded to. Once the maximum has been reached,
the retry window will be a constant value until the maximum retries are exhausted.
</description>
</property>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ public void testFailuresArePerOperation() throws Exception
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
NamenodeProtocols spyNN = spy(preSpyNN);
DFSClient client = new DFSClient(null, spyNN, conf, null);
int maxBlockAcquires = client.getConf().getMaxBlockAcquireFailures();
int maxBlockAcquires = client.getConf()
.getFetchBlockLocationsRetryer()
.getMaxBlockAcquireFailures();
assertTrue(maxBlockAcquires > 0);


Expand Down
Loading