Skip to content

Commit

Permalink
HDFS-7835. make initial sleeptime in locateFollowingBlock configurabl…
Browse files Browse the repository at this point in the history
…e for DFSClient. Contributed by Zhihai Xu.
  • Loading branch information
Yongjun Zhang committed Mar 20, 2015
1 parent 43dde50 commit 1561231
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 4 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -323,6 +323,9 @@ Release 2.8.0 - UNRELEASED


HDFS-2360. Ugly stacktrace when quota exceeds. (harsh) HDFS-2360. Ugly stacktrace when quota exceeds. (harsh)


HDFS-7835. make initial sleeptime in locateFollowingBlock configurable for
DFSClient. (Zhihai Xu via Yongjun Zhang)

OPTIMIZATIONS OPTIMIZATIONS


BUG FIXES BUG FIXES
Expand Down
Expand Up @@ -24,6 +24,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
Expand Down Expand Up @@ -305,6 +307,7 @@ public static class Conf {
final int nCachedConnRetry; final int nCachedConnRetry;
final int nBlockWriteRetry; final int nBlockWriteRetry;
final int nBlockWriteLocateFollowingRetry; final int nBlockWriteLocateFollowingRetry;
final int blockWriteLocateFollowingInitialDelayMs;
final long defaultBlockSize; final long defaultBlockSize;
final long prefetchSize; final long prefetchSize;
final short defaultReplication; final short defaultReplication;
Expand Down Expand Up @@ -416,6 +419,9 @@ public Conf(Configuration conf) {
nBlockWriteLocateFollowingRetry = conf.getInt( nBlockWriteLocateFollowingRetry = conf.getInt(
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
blockWriteLocateFollowingInitialDelayMs = conf.getInt(
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT);
uMask = FsPermission.getUMask(conf); uMask = FsPermission.getUMask(conf);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
Expand Down Expand Up @@ -566,6 +572,11 @@ private DataChecksum createChecksum(ChecksumOpt userOpt) {
} }
return dataChecksum; return dataChecksum;
} }

@VisibleForTesting
public int getBlockWriteLocateFollowingInitialDelayMs() {
return blockWriteLocateFollowingInitialDelayMs;
}
} }


public Conf getConf() { public Conf getConf() {
Expand Down
Expand Up @@ -399,6 +399,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
// Much code in hdfs is not yet updated to use these keys. // Much code in hdfs is not yet updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries"; public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
public static final int DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT = 5; public static final int DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT = 5;
// the initial delay (unit is ms) for locateFollowingBlock, the delay time will increase exponentially(double) for each retry.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY = "dfs.client.block.write.locateFollowingBlock.initial.delay.ms";
public static final int DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_DEFAULT = 400;
public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY = "dfs.client.block.write.retries"; public static final String DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY = "dfs.client.block.write.retries";
public static final int DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3; public static final int DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3;
public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures"; public static final String DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
Expand Down
Expand Up @@ -1433,7 +1433,8 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
private LocatedBlock locateFollowingBlock(long start, private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes) throws IOException { DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400; long sleeptime = dfsClient.getConf().
blockWriteLocateFollowingInitialDelayMs;
while (true) { while (true) {
long localstart = Time.now(); long localstart = Time.now();
while (true) { while (true) {
Expand Down Expand Up @@ -2253,7 +2254,8 @@ private synchronized void closeImpl() throws IOException {
// be called during unit tests // be called during unit tests
private void completeFile(ExtendedBlock last) throws IOException { private void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.now(); long localstart = Time.now();
long localTimeout = 400; long sleeptime = dfsClient.getConf().
blockWriteLocateFollowingInitialDelayMs;
boolean fileComplete = false; boolean fileComplete = false;
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
while (!fileComplete) { while (!fileComplete) {
Expand All @@ -2276,8 +2278,8 @@ private void completeFile(ExtendedBlock last) throws IOException {
+ " does not have enough number of replicas."); + " does not have enough number of replicas.");
} }
retries--; retries--;
Thread.sleep(localTimeout); Thread.sleep(sleeptime);
localTimeout *= 2; sleeptime *= 2;
if (Time.now() - localstart > 5000) { if (Time.now() - localstart > 5000) {
DFSClient.LOG.info("Could not complete " + src + " retrying..."); DFSClient.LOG.info("Could not complete " + src + " retrying...");
} }
Expand Down
Expand Up @@ -2314,4 +2314,11 @@
<description>Whether pin blocks on favored DataNode.</description> <description>Whether pin blocks on favored DataNode.</description>
</property> </property>


<property>
<name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name>
<value>400</value>
<description>The initial delay (unit is ms) for locateFollowingBlock,
the delay time will increase exponentially(double) for each retry.
</description>
</property>
</configuration> </configuration>
Expand Up @@ -1131,4 +1131,26 @@ static void parseMultipleLinearRandomRetry(String expected, String s) {
assertEquals("MultipleLinearRandomRetry" + expected, r.toString()); assertEquals("MultipleLinearRandomRetry" + expected, r.toString());
} }
} }

@Test
public void testDFSClientConfigurationLocateFollowingBlockInitialDelay()
throws Exception {
// test if DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY
// is not configured, verify DFSClient uses the default value 400.
Configuration dfsConf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(dfsConf).build();
cluster.waitActive();
NamenodeProtocols nn = cluster.getNameNodeRpc();
DFSClient client = new DFSClient(null, nn, dfsConf, null);
assertEquals(client.getConf().
getBlockWriteLocateFollowingInitialDelayMs(), 400);

// change DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY,
// verify DFSClient uses the configured value 1000.
dfsConf.setInt(DFSConfigKeys.
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_INITIAL_DELAY_KEY, 1000);
client = new DFSClient(null, nn, dfsConf, null);
assertEquals(client.getConf().
getBlockWriteLocateFollowingInitialDelayMs(), 1000);
}
} }

0 comments on commit 1561231

Please sign in to comment.