Skip to content

Commit

Permalink
HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBl…
Browse files Browse the repository at this point in the history
…ocks for striped block. Contributed by Yi Liu.
  • Loading branch information
Jing9 committed Jul 22, 2015
1 parent f8f7a92 commit 5956d23
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 28 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -367,3 +367,6 @@

HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread.
(jing9)

HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for
striped block. (Yi Liu via jing9)
Expand Up @@ -783,7 +783,10 @@ public LocatedBlock convertLastBlockToUnderConstruction(

// remove this block from the list of pending blocks to be deleted.
for (DatanodeStorageInfo storage : targets) {
invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
final Block b = getBlockOnStorage(oldBlock, storage);
if (b != null) {
invalidateBlocks.remove(storage.getDatanodeDescriptor(), b);
}
}

// Adjust safe-mode totals, since under-construction blocks don't
Expand All @@ -802,12 +805,14 @@ public LocatedBlock convertLastBlockToUnderConstruction(
/**
* Get all valid locations of the block
*/
private List<DatanodeStorageInfo> getValidLocations(Block block) {
private List<DatanodeStorageInfo> getValidLocations(BlockInfo block) {
final List<DatanodeStorageInfo> locations
= new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
// filter invalidate replicas
if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
Block b = getBlockOnStorage(block, storage);
if(b != null &&
!invalidateBlocks.contains(storage.getDatanodeDescriptor(), b)) {
locations.add(storage);
}
}
Expand Down Expand Up @@ -1156,7 +1161,10 @@ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
while(it.hasNext()) {
BlockInfo block = it.next();
removeStoredBlock(block, node);
invalidateBlocks.remove(node, block);
final Block b = getBlockOnStorage(block, storageInfo);
if (b != null) {
invalidateBlocks.remove(node, b);
}
}
namesystem.checkSafeMode();
}
Expand Down Expand Up @@ -1184,7 +1192,7 @@ private void addToInvalidates(BlockInfo storedBlock) {
for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
State.NORMAL)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
final Block b = getBlockToInvalidate(storedBlock, storage);
final Block b = getBlockOnStorage(storedBlock, storage);
if (b != null) {
invalidateBlocks.add(b, node, false);
datanodes.append(node).append(" ");
Expand All @@ -1196,7 +1204,7 @@ private void addToInvalidates(BlockInfo storedBlock) {
}
}

private Block getBlockToInvalidate(BlockInfo storedBlock,
private Block getBlockOnStorage(BlockInfo storedBlock,
DatanodeStorageInfo storage) {
return storedBlock.isStriped() ?
((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock;
Expand Down Expand Up @@ -2054,7 +2062,10 @@ private void removeZombieReplicas(BlockReportContext context,
// more than one storage on a datanode (and because it's a difficult
// assumption to really enforce)
removeStoredBlock(block, zombie.getDatanodeDescriptor());
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
Block b = getBlockOnStorage(block, zombie);
if (b != null) {
invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b);
}
}
assert(zombie.numBlocks() == 0);
LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
Expand Down Expand Up @@ -3273,7 +3284,7 @@ private void processChosenExcessReplica(
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the datanodes.
//
final Block blockToInvalidate = getBlockToInvalidate(storedBlock, chosen);
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
blockLog.info("BLOCK* chooseExcessReplicates: "
+"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
Expand Down Expand Up @@ -3838,6 +3849,12 @@ private int invalidateWorkForOneNode(DatanodeInfo dn) {
return toInvalidate.size();
}

@VisibleForTesting
public boolean containsInvalidateBlock(final DatanodeInfo dn,
final Block block) {
return invalidateBlocks.contains(dn, block);
}

boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
if (!this.shouldCheckForEnoughRacks) {
return true;
Expand Down
Expand Up @@ -694,6 +694,13 @@ public Block[] getInvalidateBlocks(int maxblocks) {
}
}

@VisibleForTesting
public boolean containsInvalidateBlock(Block block) {
synchronized (invalidateBlocks) {
return invalidateBlocks.contains(block);
}
}

/**
* @return Approximate number of blocks currently scheduled to be written
*/
Expand Down
Expand Up @@ -22,13 +22,16 @@
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -274,28 +277,68 @@ public void testReportBadBlock() throws IOException {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
}

// do stateful read
ByteBuffer result = ByteBuffer.allocate(length);
ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
try (FSDataInputStream in = fs.open(file)) {
while ((ret = in.read(buf)) >= 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
try {
// do stateful read
ByteBuffer result = ByteBuffer.allocate(length);
ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
try (FSDataInputStream in = fs.open(file)) {
while ((ret = in.read(buf)) >= 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
}
Assert.assertEquals("The length of file should be the same to write size",
length, readLen);
Assert.assertArrayEquals(bytes, result.array());

// check whether the corruption has been reported to the NameNode
final FSNamesystem ns = cluster.getNamesystem();
final BlockManager bm = ns.getBlockManager();
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
.asFile().getBlocks())[0];
Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
} finally {
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
}
Assert.assertEquals("The length of file should be the same to write size",
length, readLen);
Assert.assertArrayEquals(bytes, result.array());
}

@Test
public void testInvalidateBlock() throws IOException {
final Path file = new Path("/invalidate");
final int length = 10;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
DFSTestUtil.writeFile(fs, file, bytes);

// check whether the corruption has been reported to the NameNode
final FSNamesystem ns = cluster.getNamesystem();
final BlockManager bm = ns.getBlockManager();
BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
.asFile().getBlocks())[0];
Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
Assert.assertNotEquals(-1, dnIndex);
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
cellSize, dataBlocks, parityBlocks);
final Block b = blks[0].getBlock().getLocalBlock();

DataNode dn = cluster.getDataNodes().get(dnIndex);
// disable the heartbeat from DN so that the invalidated block record is kept
// in NameNode until heartbeat expires and NN mark the dn as dead
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);

try {
// delete the file
fs.delete(file, true);
// check the block is added to invalidateBlocks
final FSNamesystem fsn = cluster.getNamesystem();
final BlockManager bm = fsn.getBlockManager();
DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
Assert.assertTrue(bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
} finally {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
}
}

0 comments on commit 5956d23

Please sign in to comment.