diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java index a79e4c594ac2b..4a6ae3aafb809 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java @@ -312,7 +312,7 @@ public static boolean isStripedBlockID(long id) { * and the other 60 bits are 1. Group ID is the first 60 bits of any * data/parity block id in the same striped block group. */ - static long convertToStripedID(long id) { + public static long convertToStripedID(long id) { return id & (~HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index f97dbbfcd6e8a..50d7b44b347c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; @@ -564,6 +565,19 @@ private void scan() { // volumeMap record and on-disk files do not match. statsRecord.duplicateBlocks++; addDifference(diffRecord, statsRecord, info); + } else if (BlockIdManager.isStripedBlockID(info.getBlockId()) && d > 0) { + long bgId = BlockIdManager.convertToStripedID(info.getBlockId()); + boolean isPrevBlockStriped = BlockIdManager.isStripedBlockID( + blockpoolReport.get(d - 1).getBlockId()); + if (isPrevBlockStriped) { + long prevBlockBGId = BlockIdManager.convertToStripedID( + blockpoolReport.get(d - 1).getBlockId()); + // Two ec blocks which are in the same block group. + if (bgId == prevBlockBGId) { + statsRecord.duplicateBlocks++; + addDifference(diffRecord, statsRecord, info); + } + } } } d++; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d81b5411c531b..5a48d900d7e9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock; import org.apache.hadoop.hdfs.server.common.DataNodeLockManager; import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel; @@ -2846,6 +2847,15 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) LOG.warn("Failed to delete " + diskFile); } } + } else if (BlockIdManager.isStripedBlockID(scanInfo.getBlockId())) { + // Two ec blocks with same block group id on one datanode. + if (fileIoProvider.delete(vol, diskFile)) { + volumeMap.remove(bpid, blockId); + LOG.info("Delete duplicated block group EC block {} successfully.", + scanInfo.getBlockId()); + return; + } + LOG.warn("Failed to delete duplicated ec block {}", diskFile); } } else { // Block refers to a block file that does not exist. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 74c70cec76967..b06c779791db6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -35,6 +35,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -44,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -55,13 +57,26 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; @@ -74,13 +89,18 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.Time; import org.apache.log4j.SimpleLayout; import org.apache.log4j.WriterAppender; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -106,6 +126,17 @@ public class TestDirectoryScanner { private final Random r = new Random(); private static final int BLOCK_LENGTH = 100; + private DistributedFileSystem dfs; + private FSNamesystem fsNamesystem; + private final Path dirPath = new Path("/striped"); + private Path filePath = new Path(dirPath, "file"); + private final ErasureCodingPolicy ecPolicy = getPolicy(); + private final short dataBlocks = (short) ecPolicy.getNumDataUnits(); + private final short parityBlocks = (short) ecPolicy.getNumParityUnits(); + private final int cellSize = ecPolicy.getCellSize(); + private final int stripesPerBlock = 6; + private final int blockSize = stripesPerBlock * cellSize; + public Configuration getConfiguration() { Configuration configuration = new HdfsConfiguration(); configuration.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH); @@ -116,6 +147,14 @@ public Configuration getConfiguration() { return configuration; } + public Configuration getConfigurationForECTest() { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 3); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3); + return conf; + } + @Before public void setup() { LazyPersistTestCase.initCacheManipulator(); @@ -562,6 +601,154 @@ public void testRegularBlock() throws Exception { } } + public ErasureCodingPolicy getPolicy() { + return StripedFileTestUtil.getDefaultECPolicy(); + } + + /** + * Test case that two ec blocks in the same block group on one datanode. + * @throws Exception + */ + @Test(timeout = 600000) + public void testDirectoryScannerWithDupStripedReplicas() throws Exception { + // Run the test with and without parallel scanning + for (int parallelism = 1; parallelism < 2; parallelism++) { + runTestWithDupStripedReplicas(parallelism); + } + } + + + /** + * This unit test reproduces such a situation: + * @param parallelism + * @throws Exception + */ + public void runTestWithDupStripedReplicas(int parallelism) throws Exception { + StaticMapping.resetMap(); + Configuration conf = getConfigurationForECTest(); + final ArrayList rackList = new ArrayList(); + final ArrayList hostList = new ArrayList(); + for (int i = 0; i < 9; i++) { + for (int j = 1; j < 4; j++) { + rackList.add("/rack" + i); + hostList.add("/host" + i + j); + } + } + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, + parallelism); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(hostList.size()) + .racks(rackList.toArray(new String[rackList.size()])) + .hosts(hostList.toArray(new String[hostList.size()])) + .build(); + try { + cluster.waitActive(); + dfs = cluster.getFileSystem(); + fsNamesystem = cluster.getNamesystem(); + dfs.enableErasureCodingPolicy(ecPolicy.getName()); + dfs.mkdirs(dirPath); + dfs.getClient().setErasureCodingPolicy(dirPath.toString(), + ecPolicy.getName()); + + bpid = cluster.getNamesystem().getBlockPoolId(); + client = cluster.getFileSystem().getClient(); + final int filesize = ecPolicy.getNumDataUnits() * blockSize; + byte[] contents = StripedFileTestUtil.generateBytes(filesize); + LOG.info("Writing file " + filePath); + DFSTestUtil.writeFile(dfs, filePath, contents); + + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + filePath.toString(), 0, filesize); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg, + cellSize, dataBlocks, parityBlocks); + + String datanode1Uuid = blocks[1].getLocations()[0].getDatanodeUuid(); + String datanode2Uuid = blocks[2].getLocations()[0].getDatanodeUuid(); + String blockTwoStorageID = blocks[2].getLocations()[0].getStorageID(); + DatanodeDescriptor datanodeDescriptor2 = fsNamesystem.getBlockManager().getDatanodeManager().getDatanode(datanode2Uuid); + + DatanodeStorageInfo recoveryTargetStorage = null; + DatanodeStorageInfo[] dn2StorageInfos = datanodeDescriptor2.getStorageInfos(); + for (int i = 0; i < dn2StorageInfos.length; i++) { + if (!dn2StorageInfos[i].getStorageID().equals(blockTwoStorageID)) { + recoveryTargetStorage = dn2StorageInfos[i]; + break; + } + } + Assert.assertNotNull(recoveryTargetStorage); + + byte[] liveBlkIndices = new byte[6]; + byte[] excludeReconstructedIndices = new byte[0]; + DatanodeDescriptor[] srcNodes = new DatanodeDescriptor[ecPolicy.getNumDataUnits()]; + int numSrcChoosed = 0; + for (LocatedBlock blk : blocks) { + String tmpDatanodeUuid = blk.getLocations()[0].getDatanodeUuid(); + if (!tmpDatanodeUuid.equals(datanode1Uuid) && numSrcChoosed < 6) { + srcNodes[numSrcChoosed] = fsNamesystem.getBlockManager().getDatanodeManager().getDatanode(tmpDatanodeUuid); + byte blockIndex = BlockIdManager.getBlockIndex(blk.getBlock().getLocalBlock()); + liveBlkIndices[numSrcChoosed] = blockIndex; + System.out.println("ZHB### BlockIndex is " + blockIndex); + numSrcChoosed++; + } + if (numSrcChoosed == 6) { + break; + } + } + DatanodeStorageInfo[] targetDSInfos = new DatanodeStorageInfo[] {recoveryTargetStorage}; + final BlockInfo storedBlock = fsNamesystem.getBlockManager().getStoredBlock( + blocks[0].getBlock().getLocalBlock()); + BlockInfoStriped b1k = (BlockInfoStriped) storedBlock; + + BlockECReconstructionCommand.BlockECReconstructionInfo blkECRecoveryBlock1 = + new BlockECReconstructionCommand.BlockECReconstructionInfo( + new ExtendedBlock(blocks[0].getBlock().getBlockPoolId(), b1k), srcNodes, targetDSInfos, + liveBlkIndices, excludeReconstructedIndices, ecPolicy); + + List ecTasks = new ArrayList<>(); + ecTasks.add(blkECRecoveryBlock1); + + DataNode datanode2 = null; + for (DataNode dn : cluster.getDataNodes()) { + if (dn.getDatanodeUuid().equals(datanodeDescriptor2.getDatanodeUuid())) { + datanode2 = dn; + break; + } + } + Assert.assertNotNull(datanode2); + fds = DataNodeTestUtils.getFSDataset(datanode2); + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); + datanode2.getErasureCodingWorker().processErasureCodingTasks(ecTasks); + Thread.sleep(3000); + String blockPoolId = blocks[0].getBlock().getBlockPoolId(); + FsDatasetTestUtils fsDatasetTestUtilsDN2 = cluster.getFsDatasetTestUtils(datanode2); + Replica replica1 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[1].getBlock().getBlockId())); + Replica replica2 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[2].getBlock().getBlockId())); + Assert.assertNotNull(replica1); + Assert.assertNotNull(replica2); + LOG.info("EC Recovery is finished."); + + // We can see "moved to storageType" in logs. + datanode2.triggerBlockReport(new BlockReportOptions.Factory().setIncremental(false).build()); + + scan(2, 1, 0, 0, 0, 0, 1); + replica1 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[1].getBlock().getBlockId())); + replica2 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[2].getBlock().getBlockId())); + Assert.assertNotNull(replica1); + Assert.assertNull(replica2); + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + cluster.shutdown(); + } + } + @Test(timeout = 600000) public void testDirectoryScanner() throws Exception { // Run the test with and without parallel scanning