diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 6e15251d66b4c..7b64638fac4ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -40,7 +40,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +61,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks; import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.FetchBlockLocationsRetryer; import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -996,10 +996,12 @@ private DNAddrPair chooseDataNode(LocatedBlock block, private LocatedBlock refetchLocations(LocatedBlock block, Collection ignoredNodes) throws IOException { + FetchBlockLocationsRetryer retryer = dfsClient.getConf().getFetchBlockLocationsRetryer(); + String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), dfsClient.getDeadNodes(this), ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; - if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { + if (retryer.isMaxFailuresExceeded(failures)) { String description = "Could not obtain block: " + blockInfo; DFSClient.LOG.warn(description + errMsg + ". Throwing a BlockMissingException"); @@ -1015,21 +1017,7 @@ private LocatedBlock refetchLocations(LocatedBlock block, + " from any node: " + errMsg + ". Will get new block locations from namenode and retry..."); try { - // Introducing a random factor to the wait time before another retry. - // The wait time is dependent on # of failures and a random factor. - // At the first time of getting a BlockMissingException, the wait time - // is a random number between 0..3000 ms. If the first retry - // still fails, we will wait 3000 ms grace period before the 2nd retry. - // Also at the second retry, the waiting window is expanded to 6000 ms - // alleviating the request rate from the server. Similarly the 3rd retry - // will wait 6000ms grace period before retry and the waiting window is - // expanded to 9000ms. - final int timeWindow = dfsClient.getConf().getTimeWindow(); - // grace period for the last round of attempt - double waitTime = timeWindow * failures + - // expanding time window for each failure - timeWindow * (failures + 1) * - ThreadLocalRandom.current().nextDouble(); + double waitTime = retryer.getWaitTime(failures); DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 0664f6414ec64..c320691652407 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -333,6 +333,12 @@ interface Retry { String WINDOW_BASE_KEY = PREFIX + "window.base"; int WINDOW_BASE_DEFAULT = 3000; + + String WINDOW_MULTIPLIER_KEY = PREFIX + "window.multiplier"; + int WINDOW_MULTIPLIER_DEFAULT = 1; + + String WINDOW_MAXIMUM_KEY = PREFIX + "window.max"; + int WINDOW_MAXIMUM_DEFAULT = 30000; } /** dfs.client.failover configuration properties */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 0864256fd12fd..bba68671f4ec9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; @@ -112,7 +113,7 @@ public class DfsClientConf { private final int maxPipelineRecoveryRetries; private final int failoverSleepBaseMillis; private final int failoverSleepMaxMillis; - private final int maxBlockAcquireFailures; + private final FetchBlockLocationsRetryer fetchBlockLocationsRetryer; private final int datanodeSocketWriteTimeout; private final int ioBufferSize; private final ChecksumOpt defaultChecksumOpt; @@ -124,8 +125,6 @@ public class DfsClientConf { private final int socketTimeout; private final int socketSendBufferSize; private final long excludedNodesCacheExpiry; - /** Wait time window (in msec) if BlockMissingException is caught. */ - private final int timeWindow; private final int numCachedConnRetry; private final int numBlockWriteRetry; private final int numBlockWriteLocateFollowingRetry; @@ -170,9 +169,6 @@ public DfsClientConf(Configuration conf) { maxRetryAttempts = conf.getInt( Retry.MAX_ATTEMPTS_KEY, Retry.MAX_ATTEMPTS_DEFAULT); - timeWindow = conf.getInt( - Retry.WINDOW_BASE_KEY, - Retry.WINDOW_BASE_DEFAULT); retryTimesForGetLastBlockLength = conf.getInt( Retry.TIMES_GET_LAST_BLOCK_LENGTH_KEY, Retry.TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT); @@ -190,9 +186,8 @@ public DfsClientConf(Configuration conf) { Failover.SLEEPTIME_MAX_KEY, Failover.SLEEPTIME_MAX_DEFAULT); - maxBlockAcquireFailures = conf.getInt( - DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, - DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); + fetchBlockLocationsRetryer = new FetchBlockLocationsRetryer(conf); + datanodeSocketWriteTimeout = conf.getInt( DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, HdfsConstants.WRITE_TIMEOUT); @@ -448,13 +443,6 @@ public int getFailoverSleepMaxMillis() { return failoverSleepMaxMillis; } - /** - * @return the maxBlockAcquireFailures - */ - public int getMaxBlockAcquireFailures() { - return maxBlockAcquireFailures; - } - /** * @return the datanodeSocketWriteTimeout */ @@ -540,10 +528,11 @@ public long getExcludedNodesCacheExpiry() { } /** - * @return the timeWindow + * + * @return the fetchBlockLocationsRetryer */ - public int getTimeWindow() { - return timeWindow; + public FetchBlockLocationsRetryer getFetchBlockLocationsRetryer() { + return fetchBlockLocationsRetryer; } /** @@ -994,4 +983,98 @@ public String confAsString() { + domainSocketDisableIntervalSeconds; } } + + /** + * Handles calculating the wait time when BlockMissingException is caught. + */ + public static class FetchBlockLocationsRetryer { + private final int maxBlockAcquireFailures; + private final int timeWindowBase; + private final int timeWindowMultiplier; + private final int timeWindowMax; + private final boolean enableRandom; + + public FetchBlockLocationsRetryer(Configuration conf) { + this(conf, true); + } + + /** + * It helps for testing to be able to disable the random factor. It should remain + * enabled for non-test use + */ + @VisibleForTesting + FetchBlockLocationsRetryer(Configuration conf, boolean enableRandom) { + maxBlockAcquireFailures = conf.getInt( + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, + DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT); + timeWindowBase = conf.getInt( + Retry.WINDOW_BASE_KEY, + Retry.WINDOW_BASE_DEFAULT); + timeWindowMultiplier = conf.getInt( + Retry.WINDOW_MULTIPLIER_KEY, + Retry.WINDOW_MULTIPLIER_DEFAULT); + timeWindowMax = conf.getInt( + Retry.WINDOW_MAXIMUM_KEY, + Retry.WINDOW_MAXIMUM_DEFAULT + ); + this.enableRandom = enableRandom; + } + + /** + * For tests, exposes the maximum allowed failures. + */ + @VisibleForTesting + public int getMaxBlockAcquireFailures() { + return maxBlockAcquireFailures; + } + + /** + * Returns whether the passed number of failures is greater or equal to the maximum + * allowed failures. + */ + public boolean isMaxFailuresExceeded(int numFailures) { + return numFailures >= maxBlockAcquireFailures; + } + + /** + * The wait time is calculated using a grace period, a time window, and a + * random factor applied to that time window. With each subsequent failure, + * the grace period expands to the maximum value of the previous time window, + * and the time window upper limit expands by a constant exponential multiplier. + * The first retry has a grace period of 0ms. + * + * With default settings, the first failure will result in a wait time of a + * random number between 0 and 3000ms. The second failure will have a grace + * period of 3000ms, and an additional wait time of a random number between 0 and + * 6000ms. Subsequent failures will expand to 6000ms grace period and 0 - 9000ms, + * then 9000ms grace and 0 - 12000ms, etc. + * + * This behavior can be made more and less aggressive by configuring the base + * value (default 3000ms) and constant exponential multiplier (default 1). For + * example, a base of 10 and multiplier 5 could result in one very fast retry that + * quickly backs off in case of multiple failures. This may be useful for low + * latency applications. One downside with high multipliers is how quickly the + * backoff can get to very high numbers. One can further customize this by setting + * a maximum window size to cap. + */ + public double getWaitTime(int numFailures) { + double gracePeriod = backoff(numFailures); + double waitTimeWithRandomFactor = backoff(numFailures + 1) * getRandomFactor(); + + return gracePeriod + waitTimeWithRandomFactor; + } + + private double backoff(int failures) { + double window = timeWindowBase * Math.pow(timeWindowMultiplier, failures) * failures; + return Math.min(window, timeWindowMax); + } + + private double getRandomFactor() { + if (enableRandom) { + return ThreadLocalRandom.current().nextDouble(); + } else { + return 1; + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 808ecfbe0c44e..955e578bdd737 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1653,6 +1653,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys { @Deprecated public static final int DFS_CLIENT_RETRY_WINDOW_BASE_DEFAULT = HdfsClientConfigKeys.Retry.WINDOW_BASE_DEFAULT; + @Deprecated + public static final String DFS_CLIENT_RETRY_WINDOW_MULTIPLIER + = HdfsClientConfigKeys.Retry.WINDOW_MULTIPLIER_KEY; + @Deprecated + public static final int DFS_CLIENT_RETRY_WINDOW_MULTIPLIER_DEFAULT + = HdfsClientConfigKeys.Retry.WINDOW_MULTIPLIER_DEFAULT; + @Deprecated + public static final String DFS_CLIENT_RETRY_WINDOW_MAXIMUM + = HdfsClientConfigKeys.Retry.WINDOW_MAXIMUM_KEY; + @Deprecated + public static final int DFS_CLIENT_RETRY_WINDOW_MAXIMUM_DEFAULT + = HdfsClientConfigKeys.Retry.WINDOW_MAXIMUM_DEFAULT; // dfs.client.failover confs are moved to HdfsClientConfigKeys.Failover @Deprecated diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 5818ae1812836..b27e75d7ec084 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4440,8 +4440,28 @@ 3000 Base time window in ms for DFSClient retries. For each retry attempt, - this value is extended linearly (e.g. 3000 ms for first attempt and - first retry, 6000 ms for second retry, 9000 ms for third retry, etc.). + this value is extended exponentially based on dfs.client.retry.window.multiplier. + + + + + dfs.client.retry.window.multiplier + 1 + + Multiplier for extending the retry time window. For each retry attempt, + the retry time window is extended by multiplying dfs.client.retry.window.base + by this multiplier raised to the power of the current failure count. The default + value of 1 means the window will expand linearly (e.g. 3000 ms for first attempt + and first retry, 6000 ms for second retry, 9000 ms for third retry, etc.). + + + + + dfs.client.retry.window.max + 30000 + + Maximum value that the retry window can be expanded to. Once the maximum has been reached, + the retry window will be a constant value until the maximum retries are exhausted. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java index 970003b0e58cc..396a2c264d241 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java @@ -312,7 +312,9 @@ public void testFailuresArePerOperation() throws Exception NamenodeProtocols preSpyNN = cluster.getNameNodeRpc(); NamenodeProtocols spyNN = spy(preSpyNN); DFSClient client = new DFSClient(null, spyNN, conf, null); - int maxBlockAcquires = client.getConf().getMaxBlockAcquireFailures(); + int maxBlockAcquires = client.getConf() + .getFetchBlockLocationsRetryer() + .getMaxBlockAcquireFailures(); assertTrue(maxBlockAcquires > 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestFetchBlockLocationsRetryer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestFetchBlockLocationsRetryer.java new file mode 100644 index 0000000000000..66ab92b4dd9b9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestFetchBlockLocationsRetryer.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf.FetchBlockLocationsRetryer; +import org.junit.Test; + +public class TestFetchBlockLocationsRetryer { + + private static final double EPSILON = 0.001; + + @Test + public void testIsMaxFailuresExceeded() { + Configuration conf = new Configuration(); + + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 3); + FetchBlockLocationsRetryer retryer = new FetchBlockLocationsRetryer(conf); + + assertFalse(retryer.isMaxFailuresExceeded(1)); + assertTrue(retryer.isMaxFailuresExceeded(3)); + assertTrue(retryer.isMaxFailuresExceeded(5)); + } + + @Test + public void testDefaultRetryPolicy() { + Configuration conf = new Configuration(); + int base = Retry.WINDOW_BASE_DEFAULT; + int multiplier = Retry.WINDOW_MULTIPLIER_DEFAULT; + + conf.setInt(Retry.WINDOW_BASE_KEY, base); + conf.setInt(Retry.WINDOW_MULTIPLIER_KEY, multiplier); + + // disable random factor so it's easier to test + FetchBlockLocationsRetryer retryer = new FetchBlockLocationsRetryer(conf, false); + + // we've disabled the random factor, so the wait times here would be the + // worst case scenarios + assertEquals(3_000, retryer.getWaitTime(0), EPSILON); + assertEquals(3_000 + 6_000, retryer.getWaitTime(1), EPSILON); + assertEquals(6_000 + 9_000, retryer.getWaitTime(2), EPSILON); + assertEquals(9_000 + 12_000, retryer.getWaitTime(3), EPSILON); + } + + @Test + public void testAggressiveRetryPolicy() { + Configuration conf = new Configuration(); + int base = 10; + int multiplier = 5; + + conf.setInt(Retry.WINDOW_BASE_KEY, base); + conf.setInt(Retry.WINDOW_MULTIPLIER_KEY, multiplier); + + // disable random factor so it's easier to test + FetchBlockLocationsRetryer retryer = new FetchBlockLocationsRetryer(conf, false); + + // we've disabled the random factor, so the wait times here would be the + // worst case scenarios + assertEquals(50, retryer.getWaitTime(0), EPSILON); + assertEquals(50 + 500, retryer.getWaitTime(1), EPSILON); + assertEquals(500 + 3_750, retryer.getWaitTime(2), EPSILON); + assertEquals(3_750 + 25_000, retryer.getWaitTime(3), EPSILON); + } + + @Test + public void testMaxWindowSize() { + Configuration conf = new Configuration(); + int base = 10; + int multiplier = 10; + int maxWindow = 1_000; + + conf.setInt(Retry.WINDOW_BASE_KEY, base); + conf.setInt(Retry.WINDOW_MULTIPLIER_KEY, multiplier); + conf.setInt(Retry.WINDOW_MAXIMUM_KEY, maxWindow); + + // disable random factor so it's easier to test + FetchBlockLocationsRetryer retryer = new FetchBlockLocationsRetryer(conf, false); + + // we've disabled the random factor, so the wait times here would be the + // worst case scenarios + assertEquals(100, retryer.getWaitTime(0), EPSILON); + assertEquals(100 + 1_000, retryer.getWaitTime(1), EPSILON); + assertEquals(1_000 + 1_000, retryer.getWaitTime(2), EPSILON); + assertEquals(1_000 + 1_000, retryer.getWaitTime(3), EPSILON); + } +}