Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public static boolean isStripedBlockID(long id) {
* and the other 60 bits are 1. Group ID is the first 60 bits of any
* data/parity block id in the same striped block group.
*/
static long convertToStripedID(long id) {
public static long convertToStripedID(long id) {
return id & (~HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
Expand Down Expand Up @@ -564,6 +565,19 @@ private void scan() {
// volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (BlockIdManager.isStripedBlockID(info.getBlockId()) && d > 0) {
long bgId = BlockIdManager.convertToStripedID(info.getBlockId());
boolean isPrevBlockStriped = BlockIdManager.isStripedBlockID(
blockpoolReport.get(d - 1).getBlockId());
if (isPrevBlockStriped) {
long prevBlockBGId = BlockIdManager.convertToStripedID(
blockpoolReport.get(d - 1).getBlockId());
// Two ec blocks which are in the same block group.
if (bgId == prevBlockBGId) {
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
}
}
}
d++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
Expand Down Expand Up @@ -2846,6 +2847,15 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
LOG.warn("Failed to delete " + diskFile);
}
}
} else if (BlockIdManager.isStripedBlockID(scanInfo.getBlockId())) {
// Two ec blocks with same block group id on one datanode.
if (fileIoProvider.delete(vol, diskFile)) {
volumeMap.remove(bpid, blockId);
LOG.info("Delete duplicated block group EC block {} successfully.",
scanInfo.getBlockId());
return;
}
LOG.warn("Failed to delete duplicated ec block {}", diskFile);
}
} else {
// Block refers to a block file that does not exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand All @@ -44,6 +45,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -55,13 +57,26 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
Expand All @@ -74,13 +89,18 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Time;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -106,6 +126,17 @@ public class TestDirectoryScanner {
private final Random r = new Random();
private static final int BLOCK_LENGTH = 100;

private DistributedFileSystem dfs;
private FSNamesystem fsNamesystem;
private final Path dirPath = new Path("/striped");
private Path filePath = new Path(dirPath, "file");
private final ErasureCodingPolicy ecPolicy = getPolicy();
private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
private final short parityBlocks = (short) ecPolicy.getNumParityUnits();
private final int cellSize = ecPolicy.getCellSize();
private final int stripesPerBlock = 6;
private final int blockSize = stripesPerBlock * cellSize;

public Configuration getConfiguration() {
Configuration configuration = new HdfsConfiguration();
configuration.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
Expand All @@ -116,6 +147,14 @@ public Configuration getConfiguration() {
return configuration;
}

public Configuration getConfigurationForECTest() {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 3);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3);
return conf;
}

@Before
public void setup() {
LazyPersistTestCase.initCacheManipulator();
Expand Down Expand Up @@ -562,6 +601,154 @@ public void testRegularBlock() throws Exception {
}
}

public ErasureCodingPolicy getPolicy() {
return StripedFileTestUtil.getDefaultECPolicy();
}

/**
* Test case that two ec blocks in the same block group on one datanode.
* @throws Exception
*/
@Test(timeout = 600000)
public void testDirectoryScannerWithDupStripedReplicas() throws Exception {
// Run the test with and without parallel scanning
for (int parallelism = 1; parallelism < 2; parallelism++) {
runTestWithDupStripedReplicas(parallelism);
}
}


/**
* This unit test reproduces such a situation:
* @param parallelism
* @throws Exception
*/
public void runTestWithDupStripedReplicas(int parallelism) throws Exception {
StaticMapping.resetMap();
Configuration conf = getConfigurationForECTest();
final ArrayList<String> rackList = new ArrayList<String>();
final ArrayList<String> hostList = new ArrayList<String>();
for (int i = 0; i < 9; i++) {
for (int j = 1; j < 4; j++) {
rackList.add("/rack" + i);
hostList.add("/host" + i + j);
}
}
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyRackFaultTolerant.class,
BlockPlacementPolicy.class);
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
parallelism);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(hostList.size())
.racks(rackList.toArray(new String[rackList.size()]))
.hosts(hostList.toArray(new String[hostList.size()]))
.build();
try {
cluster.waitActive();
dfs = cluster.getFileSystem();
fsNamesystem = cluster.getNamesystem();
dfs.enableErasureCodingPolicy(ecPolicy.getName());
dfs.mkdirs(dirPath);
dfs.getClient().setErasureCodingPolicy(dirPath.toString(),
ecPolicy.getName());

bpid = cluster.getNamesystem().getBlockPoolId();
client = cluster.getFileSystem().getClient();
final int filesize = ecPolicy.getNumDataUnits() * blockSize;
byte[] contents = StripedFileTestUtil.generateBytes(filesize);
LOG.info("Writing file " + filePath);
DFSTestUtil.writeFile(dfs, filePath, contents);

LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), 0, filesize);
LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg,
cellSize, dataBlocks, parityBlocks);

String datanode1Uuid = blocks[1].getLocations()[0].getDatanodeUuid();
String datanode2Uuid = blocks[2].getLocations()[0].getDatanodeUuid();
String blockTwoStorageID = blocks[2].getLocations()[0].getStorageID();
DatanodeDescriptor datanodeDescriptor2 = fsNamesystem.getBlockManager().getDatanodeManager().getDatanode(datanode2Uuid);

DatanodeStorageInfo recoveryTargetStorage = null;
DatanodeStorageInfo[] dn2StorageInfos = datanodeDescriptor2.getStorageInfos();
for (int i = 0; i < dn2StorageInfos.length; i++) {
if (!dn2StorageInfos[i].getStorageID().equals(blockTwoStorageID)) {
recoveryTargetStorage = dn2StorageInfos[i];
break;
}
}
Assert.assertNotNull(recoveryTargetStorage);

byte[] liveBlkIndices = new byte[6];
byte[] excludeReconstructedIndices = new byte[0];
DatanodeDescriptor[] srcNodes = new DatanodeDescriptor[ecPolicy.getNumDataUnits()];
int numSrcChoosed = 0;
for (LocatedBlock blk : blocks) {
String tmpDatanodeUuid = blk.getLocations()[0].getDatanodeUuid();
if (!tmpDatanodeUuid.equals(datanode1Uuid) && numSrcChoosed < 6) {
srcNodes[numSrcChoosed] = fsNamesystem.getBlockManager().getDatanodeManager().getDatanode(tmpDatanodeUuid);
byte blockIndex = BlockIdManager.getBlockIndex(blk.getBlock().getLocalBlock());
liveBlkIndices[numSrcChoosed] = blockIndex;
System.out.println("ZHB### BlockIndex is " + blockIndex);
numSrcChoosed++;
}
if (numSrcChoosed == 6) {
break;
}
}
DatanodeStorageInfo[] targetDSInfos = new DatanodeStorageInfo[] {recoveryTargetStorage};
final BlockInfo storedBlock = fsNamesystem.getBlockManager().getStoredBlock(
blocks[0].getBlock().getLocalBlock());
BlockInfoStriped b1k = (BlockInfoStriped) storedBlock;

BlockECReconstructionCommand.BlockECReconstructionInfo blkECRecoveryBlock1 =
new BlockECReconstructionCommand.BlockECReconstructionInfo(
new ExtendedBlock(blocks[0].getBlock().getBlockPoolId(), b1k), srcNodes, targetDSInfos,
liveBlkIndices, excludeReconstructedIndices, ecPolicy);

List<BlockECReconstructionCommand.BlockECReconstructionInfo> ecTasks = new ArrayList<>();
ecTasks.add(blkECRecoveryBlock1);

DataNode datanode2 = null;
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getDatanodeUuid().equals(datanodeDescriptor2.getDatanodeUuid())) {
datanode2 = dn;
break;
}
}
Assert.assertNotNull(datanode2);
fds = DataNodeTestUtils.getFSDataset(datanode2);
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
datanode2.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
Thread.sleep(3000);
String blockPoolId = blocks[0].getBlock().getBlockPoolId();
FsDatasetTestUtils fsDatasetTestUtilsDN2 = cluster.getFsDatasetTestUtils(datanode2);
Replica replica1 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[1].getBlock().getBlockId()));
Replica replica2 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[2].getBlock().getBlockId()));
Assert.assertNotNull(replica1);
Assert.assertNotNull(replica2);
LOG.info("EC Recovery is finished.");

// We can see "moved to storageType" in logs.
datanode2.triggerBlockReport(new BlockReportOptions.Factory().setIncremental(false).build());

scan(2, 1, 0, 0, 0, 0, 1);
replica1 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[1].getBlock().getBlockId()));
replica2 = fsDatasetTestUtilsDN2.fetchReplica(new ExtendedBlock(blockPoolId, blocks[2].getBlock().getBlockId()));
Assert.assertNotNull(replica1);
Assert.assertNull(replica2);
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
}
}

@Test(timeout = 600000)
public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning
Expand Down