Skip to content

Commit

Permalink
HDFS-8143. Mover should exit after some retry when failed to move blo…
Browse files Browse the repository at this point in the history
…cks. Contributed by surendra singh lilhore
  • Loading branch information
Tsz-Wo Nicholas Sze committed May 13, 2015
1 parent 065d8f2 commit cdec12d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
Expand Up @@ -353,6 +353,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads";
public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts";
public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10;

public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 50010;
Expand Down
Expand Up @@ -58,6 +58,7 @@
import java.net.URI;
import java.text.DateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

@InterfaceAudience.Private
public class Mover {
Expand Down Expand Up @@ -107,10 +108,12 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
private final Dispatcher dispatcher;
private final StorageMap storages;
private final List<Path> targetPaths;
private final int retryMaxAttempts;
private final AtomicInteger retryCount;

private final BlockStoragePolicy[] blockStoragePolicies;

Mover(NameNodeConnector nnc, Configuration conf) {
Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) {
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
Expand All @@ -120,7 +123,10 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);

this.retryMaxAttempts = conf.getInt(
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
this.retryCount = retryCount;
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
maxConcurrentMovesPerNode, conf);
Expand Down Expand Up @@ -255,14 +261,27 @@ private boolean isSnapshotPathInCurrent(String path) throws IOException {
* @return whether there is still remaining migration work for the next
* round
*/
private boolean processNamespace() {
private boolean processNamespace() throws IOException {
getSnapshottableDirs();
boolean hasRemaining = false;
for (Path target : targetPaths) {
hasRemaining |= processPath(target.toUri().getPath());
}
// wait for pending move to finish and retry the failed migration
hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values());
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
.values());
if (hasFailed) {
if (retryCount.get() == retryMaxAttempts) {
throw new IOException("Failed to move some block's after "
+ retryMaxAttempts + " retries.");
} else {
retryCount.incrementAndGet();
}
} else {
// Reset retry count if no failure.
retryCount.set(0);
}
hasRemaining |= hasFailed;
return hasRemaining;
}

Expand Down Expand Up @@ -528,6 +547,7 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
AtomicInteger retryCount = new AtomicInteger(0);
LOG.info("namenodes = " + namenodes);

List<NameNodeConnector> connectors = Collections.emptyList();
Expand All @@ -541,7 +561,7 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
Iterator<NameNodeConnector> iter = connectors.iterator();
while (iter.hasNext()) {
NameNodeConnector nnc = iter.next();
final Mover m = new Mover(nnc, conf);
final Mover m = new Mover(nnc, conf, retryCount);
final ExitStatus r = m.run();

if (r == ExitStatus.SUCCESS) {
Expand Down
Expand Up @@ -20,12 +20,14 @@
import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
Expand All @@ -34,6 +36,7 @@
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
Expand All @@ -54,7 +57,7 @@ static Mover newMover(Configuration conf) throws IOException {
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return new Mover(nncs.get(0), conf);
return new Mover(nncs.get(0), conf, new AtomicInteger(0));
}

@Test
Expand Down Expand Up @@ -324,4 +327,38 @@ public void testTwoReplicaSameStorageTypeShouldNotSelect() throws Exception {
cluster.shutdown();
}
}

@Test
public void testMoverFailedRetry() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}}).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoverFailedRetry";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
out.writeChars("testMoverFailedRetry");
out.close();

// Delete block file so, block move will fail with FileNotFoundException
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
Assert.assertEquals("Movement should fail after some retry",
ExitStatus.IO_EXCEPTION.getExitCode(), rc);
} finally {
cluster.shutdown();
}
}
}

0 comments on commit cdec12d

Please sign in to comment.