Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
}
} else {
expectedStorageTypes = existingStoragePolicy
.chooseStorageTypes(fileInfo.getReplication());
.chooseStorageTypes((short) blockInfo.getLocations().length);
}

List<StorageType> existing = new LinkedList<StorageType>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
Expand All @@ -59,6 +60,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -82,6 +84,7 @@
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
Expand All @@ -95,6 +98,7 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.ExitUtil;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -1851,4 +1855,114 @@ public void testExternalSPSMetricsExposedToJMX() throws Exception {
shutdownCluster();
}
}

private void writeConfigFile(FileSystem localFs, Path name, List<String> nodes)
throws IOException {
// delete if it already exists
if (localFs.exists(name)) {
localFs.delete(name, true);
}

FSDataOutputStream stm = localFs.create(name);

if (nodes != null) {
for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
String node = it.next();
stm.writeBytes(node);
stm.writeBytes("\n");
}
}
stm.close();
}

private void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) {
boolean done = state == node.getAdminState();
while (!done) {
LOG.info("Waiting for node " + node + " to change state to " + state
+ " current state: " + node.getAdminState());
try {
Thread.sleep(1 * 500);
} catch (InterruptedException e) {
// nothing
}
done = state == node.getAdminState();
}
LOG.info("node " + node + " reached the state " + state);
}

@Test
public void testSPSWithDecommission() throws Exception {
try {
FileSystem localFs = FileSystem.getLocal(config);
Path workingDir = localFs.getWorkingDirectory();
Path decommissionDir = new Path(workingDir,
PathUtils.getTestDirName(getClass()) + "/work-dir/decommission");
Path hostsFile = new Path(decommissionDir, "hosts");
Path excludeFile = new Path(decommissionDir, "exclude");
writeConfigFile(localFs, hostsFile, null);
writeConfigFile(localFs, excludeFile, null);
config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());

// start cluster with 5 dn
final int numOfDn = 5;
StorageType[][] newtypes = new StorageType[][]{
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}};
hdfsCluster = startCluster(config, newtypes, numOfDn, 2, CAPACITY);
hdfsCluster.waitActive();

// write file with 3 replication [DISK, DISK, DISK]
dfs = hdfsCluster.getFileSystem();
String file = "/testSPSWithDecommissionFile";
Path filePath = new Path(file);
writeContent(file, (short)3);

// select 2 dn for decommission
ArrayList<String> excludedList = new ArrayList<>();
DFSClient client = new DFSClient(hdfsCluster.getNameNode(0).
getNameNodeAddress(), config);
DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
int count = 0;
int total = 2;
int [] arr = new int [total];
for (int j = 0, i = 0; i < info.length; ++i) {
if (count < total && info[i].getNumBlocks() != 0) {
excludedList.add(info[i].getXferAddr());
count++;
arr[j++] = i;
}
}

// before decommission, set path with ALL_SSD storage policy
dfs.setStoragePolicy(filePath, "ALL_SSD");

// doing decommission, the replications will become [DISK, DISK, DISK, SSD, SSD]
writeConfigFile(localFs, excludeFile, excludedList);
hdfsCluster.getNamesystem(0)
.getBlockManager()
.getDatanodeManager()
.refreshNodes(config);
for (int index = 0; index < arr.length; ++index) {
DatanodeInfo ret = NameNodeAdapter
.getDatanode(hdfsCluster.getNamesystem(0), info[arr[index]]);
waitNodeState(ret, DatanodeInfo.AdminStates.DECOMMISSIONED);
}

// after decommission, set path with HOT storage policy
dfs.setStoragePolicy(filePath, "HOT");
dfs.satisfyStoragePolicy(filePath);

// the replications should not with SSD type
hdfsCluster.triggerHeartbeats();
DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 5, 30000,
dfs);
} finally {
shutdownCluster();
}
}

}