From ff3d47bea3656b93dc0649b6e624f41fb9000d2c Mon Sep 17 00:00:00 2001 From: liubingxing Date: Tue, 10 May 2022 12:02:42 +0800 Subject: [PATCH] HDFS-16575. [SPS]Should use real replication num instead getReplication from namenode --- .../namenode/sps/StoragePolicySatisfier.java | 2 +- .../TestExternalStoragePolicySatisfier.java | 114 ++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 3efe6b1cd6d86..4410cc8017328 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -402,7 +402,7 @@ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( } } else { expectedStorageTypes = existingStoragePolicy - .chooseStorageTypes(fileInfo.getReplication()); + .chooseStorageTypes((short) blockInfo.getLocations().length); } List existing = new LinkedList( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java index 8012f30134366..25c50da6df6f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -48,6 +48,7 @@ import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeoutException; @@ -59,6 +60,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -82,6 +84,7 @@ import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener; import org.apache.hadoop.hdfs.server.namenode.sps.BlockStorageMovementAttemptedItems; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; @@ -95,6 +98,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.ExitUtil; import org.junit.After; import org.junit.Assert; @@ -1851,4 +1855,114 @@ public void testExternalSPSMetricsExposedToJMX() throws Exception { shutdownCluster(); } } + + private void writeConfigFile(FileSystem localFs, Path name, List nodes) + throws IOException { + // delete if it already exists + if (localFs.exists(name)) { + localFs.delete(name, true); + } + + FSDataOutputStream stm = localFs.create(name); + + if (nodes != null) { + for (Iterator it = nodes.iterator(); it.hasNext();) { + String node = it.next(); + stm.writeBytes(node); + stm.writeBytes("\n"); + } + } + stm.close(); + } + + private void waitNodeState(DatanodeInfo node, DatanodeInfo.AdminStates state) { + boolean done = state == node.getAdminState(); + while (!done) { + LOG.info("Waiting for node " + node + " to change state to " + state + + " current state: " + node.getAdminState()); + try { + Thread.sleep(1 * 500); + } catch (InterruptedException e) { + // nothing + } + done = state == node.getAdminState(); + } + LOG.info("node " + node + " reached the state " + state); + } + + @Test + public void testSPSWithDecommission() throws Exception { + try { + FileSystem localFs = FileSystem.getLocal(config); + Path workingDir = localFs.getWorkingDirectory(); + Path decommissionDir = new Path(workingDir, + PathUtils.getTestDirName(getClass()) + "/work-dir/decommission"); + Path hostsFile = new Path(decommissionDir, "hosts"); + Path excludeFile = new Path(decommissionDir, "exclude"); + writeConfigFile(localFs, hostsFile, null); + writeConfigFile(localFs, excludeFile, null); + config.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath()); + config.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); + + // start cluster with 5 dn + final int numOfDn = 5; + StorageType[][] newtypes = new StorageType[][]{ + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}; + hdfsCluster = startCluster(config, newtypes, numOfDn, 2, CAPACITY); + hdfsCluster.waitActive(); + + // write file with 3 replication [DISK, DISK, DISK] + dfs = hdfsCluster.getFileSystem(); + String file = "/testSPSWithDecommissionFile"; + Path filePath = new Path(file); + writeContent(file, (short)3); + + // select 2 dn for decommission + ArrayList excludedList = new ArrayList<>(); + DFSClient client = new DFSClient(hdfsCluster.getNameNode(0). + getNameNodeAddress(), config); + DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE); + int count = 0; + int total = 2; + int [] arr = new int [total]; + for (int j = 0, i = 0; i < info.length; ++i) { + if (count < total && info[i].getNumBlocks() != 0) { + excludedList.add(info[i].getXferAddr()); + count++; + arr[j++] = i; + } + } + + // before decommission, set path with ALL_SSD storage policy + dfs.setStoragePolicy(filePath, "ALL_SSD"); + + // doing decommission, the replications will become [DISK, DISK, DISK, SSD, SSD] + writeConfigFile(localFs, excludeFile, excludedList); + hdfsCluster.getNamesystem(0) + .getBlockManager() + .getDatanodeManager() + .refreshNodes(config); + for (int index = 0; index < arr.length; ++index) { + DatanodeInfo ret = NameNodeAdapter + .getDatanode(hdfsCluster.getNamesystem(0), info[arr[index]]); + waitNodeState(ret, DatanodeInfo.AdminStates.DECOMMISSIONED); + } + + // after decommission, set path with HOT storage policy + dfs.setStoragePolicy(filePath, "HOT"); + dfs.satisfyStoragePolicy(filePath); + + // the replications should not with SSD type + hdfsCluster.triggerHeartbeats(); + DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 5, 30000, + dfs); + } finally { + shutdownCluster(); + } + } + }