Skip to content

Commit

Permalink
HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the bl…
Browse files Browse the repository at this point in the history
…ocklocations which doesn't satisfy BlockGroupSize.
  • Loading branch information
Zhe Zhang committed Aug 14, 2015
1 parent 1d37a88 commit b57c9a3
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 9 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -397,3 +397,6 @@

HDFS-8854. Erasure coding: add ECPolicy to replace schema+cellSize in
hadoop-hdfs. (Walter Su via zhz)

HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the
blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz)
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
Expand Down Expand Up @@ -167,18 +168,33 @@ void populate() throws IOException {

final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
excludedNodes);
if (lb.getLocations().length < HdfsConstants.NUM_DATA_BLOCKS) {
throw new IOException(
"Failed to get datablocks number of nodes from namenode: blockGroupSize= "
+ (HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS)
+ ", blocks.length= " + lb.getLocations().length);
}
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
(LocatedStripedBlock)lb,
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);

for (int i = 0; i < blocks.length; i++) {
if (!coordinator.getStripedDataStreamer(i).isFailed()) {
if (blocks[i] == null) {
getLastException().set(
new IOException("Failed to get following block, i=" + i));
} else {
followingBlocks.offer(i, blocks[i]);
}
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
if (si.isFailed()) {
continue; // skipping failed data streamer
}
if (blocks[i] == null) {
// Set exception and close streamer as there is no block locations
// found for the parity block.
LOG.warn("Failed to get block location for parity block, index="
+ i);
si.getLastException().set(
new IOException("Failed to get following block, i=" + i));
si.setFailed(true);
si.endBlock();
si.close(true);
} else {
followingBlocks.offer(i, blocks[i]);
}
}
}
Expand All @@ -199,7 +215,11 @@ void populate() throws IOException {
.parseStripedBlockGroup((LocatedStripedBlock) updated,
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
if (si.isFailed()) {
continue; // skipping failed data streamer
}
final ExtendedBlock bi = si.getBlock();
if (bi != null) {
final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
null, null, null, -1, updated.isCorrupt(), null);
Expand All @@ -225,7 +245,11 @@ void populate() throws IOException {
final ExtendedBlock newBG = newBlock(bg, newGS);
final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
if (si.isFailed()) {
continue; // skipping failed data streamer
}
final ExtendedBlock bi = si.getBlock();
updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp()));
}
}
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -33,13 +35,15 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -145,6 +149,86 @@ public void testBlockTokenExpired() throws Exception {
}
}

@Test(timeout = 90000)
public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
try {
setup(conf);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
// shutdown few datanodes to avoid getting sufficient data blocks number
// of datanodes
int killDns = dataNodes.size() / 2;
int numDatanodes = dataNodes.size() - killDns;
for (int i = 0; i < killDns; i++) {
cluster.stopDataNode(i);
}
cluster.restartNameNodes();
cluster.triggerHeartbeats();
DatanodeInfo[] info = dfs.getClient().datanodeReport(
DatanodeReportType.LIVE);
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
final Path dirFile = new Path(dir, "ecfile");
FSDataOutputStream out = null;
try {
out = dfs.create(dirFile, true);
out.write("something".getBytes());
out.flush();
out.close();
Assert.fail("Failed to validate available dns against blkGroupSize");
} catch (IOException ioe) {
// expected
GenericTestUtils.assertExceptionContains("Failed: the number of "
+ "remaining blocks = 5 < the number of data blocks = 6", ioe);
DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out
.getWrappedStream();

// get leading streamer and verify the last exception
StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0);
try {
datastreamer.getLastException().check(true);
Assert.fail("Failed to validate available dns against blkGroupSize");
} catch (IOException le) {
GenericTestUtils.assertExceptionContains(
"Failed to get datablocks number of nodes from"
+ " namenode: blockGroupSize= 9, blocks.length= "
+ numDatanodes, le);
}
}
} finally {
tearDown();
}
}

@Test(timeout = 90000)
public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
try {
setup(conf);
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
// shutdown few data nodes to avoid writing parity blocks
int killDns = (NUM_PARITY_BLOCKS - 1);
int numDatanodes = dataNodes.size() - killDns;
for (int i = 0; i < killDns; i++) {
cluster.stopDataNode(i);
}
cluster.restartNameNodes();
cluster.triggerHeartbeats();
DatanodeInfo[] info = dfs.getClient().datanodeReport(
DatanodeReportType.LIVE);
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes");
int fileLength = HdfsConstants.BLOCK_STRIPED_CELL_SIZE - 1000;
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
} finally {
tearDown();
}
}

private void runTest(final Path p, final int length, final int killPos,
final int dnIndex, final boolean tokenExpire) throws Exception {
LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos
Expand Down

0 comments on commit b57c9a3

Please sign in to comment.