Skip to content

Commit

Permalink
HDFS-8540. Mover should exit with NO_MOVE_BLOCK if no block can be mo…
Browse files Browse the repository at this point in the history
…ved. Contributed by surendra singh lilhore
  • Loading branch information
Tsz-Wo Nicholas Sze committed Jun 15, 2015
1 parent 2cb09e9 commit 321940c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 38 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -917,6 +917,9 @@ Release 2.7.1 - UNRELEASED
HDFS-8521. Add VisibleForTesting annotation to
BlockPoolSlice#selectReplicaToDelete. (cmccabe)

HDFS-8540. Mover should exit with NO_MOVE_BLOCK if no block can be moved.
(surendra singh lilhore via szetszwo)

OPTIMIZATIONS

BUG FIXES
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
Expand Down Expand Up @@ -163,8 +162,7 @@ private void initStoragePolicies() throws IOException {
private ExitStatus run() {
try {
init();
boolean hasRemaining = new Processor().processNamespace();
return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS;
return new Processor().processNamespace().getExitStatus();
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS;
Expand Down Expand Up @@ -262,11 +260,11 @@ private boolean isSnapshotPathInCurrent(String path) throws IOException {
* @return whether there is still remaining migration work for the next
* round
*/
private boolean processNamespace() throws IOException {
private Result processNamespace() throws IOException {
getSnapshottableDirs();
boolean hasRemaining = false;
Result result = new Result();
for (Path target : targetPaths) {
hasRemaining |= processPath(target.toUri().getPath());
processPath(target.toUri().getPath(), result);
}
// wait for pending move to finish and retry the failed migration
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
Expand All @@ -282,90 +280,87 @@ private boolean processNamespace() throws IOException {
// Reset retry count if no failure.
retryCount.set(0);
}
hasRemaining |= hasFailed;
return hasRemaining;
result.updateHasRemaining(hasFailed);
return result;
}

/**
* @return whether there is still remaing migration work for the next
* round
*/
private boolean processPath(String fullPath) {
boolean hasRemaining = false;
private void processPath(String fullPath, Result result) {
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children;
try {
children = dfs.listPaths(fullPath, lastReturnedName, true);
} catch(IOException e) {
LOG.warn("Failed to list directory " + fullPath
+ ". Ignore the directory and continue.", e);
return hasRemaining;
return;
}
if (children == null) {
return hasRemaining;
return;
}
for (HdfsFileStatus child : children.getPartialListing()) {
hasRemaining |= processRecursively(fullPath, child);
processRecursively(fullPath, child, result);
}
if (children.hasMore()) {
lastReturnedName = children.getLastName();
} else {
return hasRemaining;
return;
}
}
}

/** @return whether the migration requires next round */
private boolean processRecursively(String parent, HdfsFileStatus status) {
private void processRecursively(String parent, HdfsFileStatus status,
Result result) {
String fullPath = status.getFullName(parent);
boolean hasRemaining = false;
if (status.isDir()) {
if (!fullPath.endsWith(Path.SEPARATOR)) {
fullPath = fullPath + Path.SEPARATOR;
}

hasRemaining = processPath(fullPath);
processPath(fullPath, result);
// process snapshots if this is a snapshottable directory
if (snapshottableDirs.contains(fullPath)) {
final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
hasRemaining |= processPath(dirSnapshot);
processPath(dirSnapshot, result);
}
} else if (!status.isSymlink()) { // file
try {
if (!isSnapshotPathInCurrent(fullPath)) {
// the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it.
hasRemaining = processFile(fullPath, (HdfsLocatedFileStatus)status);
processFile(fullPath, (HdfsLocatedFileStatus) status, result);
}
} catch (IOException e) {
LOG.warn("Failed to check the status of " + parent
+ ". Ignore it and continue.", e);
return false;
}
}
return hasRemaining;
}

/** @return true if it is necessary to run another round of migration */
private boolean processFile(String fullPath, HdfsLocatedFileStatus status) {
private void processFile(String fullPath, HdfsLocatedFileStatus status,
Result result) {
final byte policyId = status.getStoragePolicy();
// currently we ignore files with unspecified storage policy
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
return false;
return;
}
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
if (policy == null) {
LOG.warn("Failed to get the storage policy of file " + fullPath);
return false;
return;
}
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());

final LocatedBlocks locatedBlocks = status.getBlockLocations();
boolean hasRemaining = false;
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
for(int i = 0; i < lbs.size(); i++) {
for (int i = 0; i < lbs.size(); i++) {
if (i == lbs.size() - 1 && !lastBlkComplete) {
// last block is incomplete, skip it
continue;
Expand All @@ -375,12 +370,15 @@ private boolean processFile(String fullPath, HdfsLocatedFileStatus status) {
lb.getStorageTypes());
if (!diff.removeOverlap(true)) {
if (scheduleMoves4Block(diff, lb)) {
hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1);
result.updateHasRemaining(diff.existing.size() > 1
&& diff.expected.size() > 1);
// One block scheduled successfully, set noBlockMoved to false
result.setNoBlockMoved(false);
} else {
result.updateHasRemaining(true);
}
}
}
return hasRemaining;
}

boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
Expand Down Expand Up @@ -711,6 +709,45 @@ public int run(String[] args) throws Exception {
}
}

private static class Result {

private boolean hasRemaining;
private boolean noBlockMoved;

Result() {
hasRemaining = false;
noBlockMoved = true;
}

boolean isHasRemaining() {
return hasRemaining;
}

boolean isNoBlockMoved() {
return noBlockMoved;
}

void updateHasRemaining(boolean hasRemaining) {
this.hasRemaining |= hasRemaining;
}

void setNoBlockMoved(boolean noBlockMoved) {
this.noBlockMoved = noBlockMoved;
}

/**
* @return SUCCESS if all moves are success and there is no remaining move.
* Return NO_MOVE_BLOCK if there moves available but all the moves
* cannot be scheduled. Otherwise, return IN_PROGRESS since there
* must be some remaining moves.
*/
ExitStatus getExitStatus() {
return !isHasRemaining() ? ExitStatus.SUCCESS
: isNoBlockMoved() ? ExitStatus.NO_MOVE_BLOCK
: ExitStatus.IN_PROGRESS;
}

}
/**
* Run a Mover in command line.
*
Expand Down
Expand Up @@ -328,6 +328,35 @@ public void testTwoReplicaSameStorageTypeShouldNotSelect() throws Exception {
}
}

@Test(timeout = 300000)
public void testMoveWhenStoragePolicyNotSatisfying() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] { { StorageType.DISK }, { StorageType.DISK },
{ StorageType.DISK } }).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveWhenStoragePolicyNotSatisfying";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file));
out.writeChars("testMoveWhenStoragePolicyNotSatisfying");
out.close();

// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", file.toString() });
int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
} finally {
cluster.shutdown();
}
}

@Test
public void testMoverFailedRetry() throws Exception {
// HDFS-8147
Expand Down
Expand Up @@ -219,7 +219,7 @@ private void runBasicTest(boolean shutdown) throws Exception {
verify(true);

setStoragePolicy();
migrate();
migrate(ExitStatus.SUCCESS);
verify(true);
} finally {
if (shutdown) {
Expand Down Expand Up @@ -250,8 +250,8 @@ void setStoragePolicy() throws Exception {
/**
* Run the migration tool.
*/
void migrate() throws Exception {
runMover();
void migrate(ExitStatus expectedExitCode) throws Exception {
runMover(expectedExitCode);
Thread.sleep(5000); // let the NN finish deletion
}

Expand All @@ -267,14 +267,14 @@ void verify(boolean verifyAll) throws Exception {
}
}

private void runMover() throws Exception {
private void runMover(ExitStatus expectedExitCode) throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
int result = Mover.run(nnMap, conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
Assert.assertEquals(expectedExitCode.getExitCode(), result);
}

private void verifyNamespace() throws Exception {
Expand Down Expand Up @@ -555,7 +555,7 @@ public void testMigrateOpenFileToArchival() throws Exception {
try {
banner("start data migration");
test.setStoragePolicy(); // set /foo to COLD
test.migrate();
test.migrate(ExitStatus.SUCCESS);

// make sure the under construction block has not been migrated
LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
Expand Down Expand Up @@ -605,7 +605,7 @@ public void testHotWarmColdDirs() throws Exception {
try {
test.runBasicTest(false);
pathPolicyMap.moveAround(test.dfs);
test.migrate();
test.migrate(ExitStatus.SUCCESS);

test.verify(true);
} finally {
Expand Down Expand Up @@ -695,7 +695,7 @@ public void testNoSpaceDisk() throws Exception {
//test move a hot file to warm
final Path file1 = new Path(pathPolicyMap.hot, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.migrate(ExitStatus.NO_MOVE_BLOCK);
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
} finally {
test.shutdownCluster();
Expand Down Expand Up @@ -753,7 +753,7 @@ public void testNoSpaceArchive() throws Exception {
{ //test move a cold file to warm
final Path file1 = new Path(pathPolicyMap.cold, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.migrate(ExitStatus.SUCCESS);
test.verify(true);
}
} finally {
Expand Down

0 comments on commit 321940c

Please sign in to comment.