Skip to content

Commit

Permalink
HDFS-8120. Erasure coding: created util class to analyze striped bloc…
Browse files Browse the repository at this point in the history
…k groups. Contributed by Zhe Zhang and Li Bo.
  • Loading branch information
Jing9 authored and Zhe Zhang committed May 26, 2015
1 parent ceb3d1c commit 5e8837d
Show file tree
Hide file tree
Showing 12 changed files with 562 additions and 276 deletions.
Expand Up @@ -1151,9 +1151,9 @@ void actualGetFromOneDataNode(final DNAddrPair datanode,
for (int i = 0; i < offsets.length; i++) {
int nread = reader.readAll(buf, offsets[i], lengths[i]);
updateReadStatistics(readStatistics, nread, reader);
if (nread != len) {
if (nread != lengths[i]) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
"excpected " + lengths[i] + ", got " + nread);
}
}
DFSClientFaultInjector.get().readFromDatanodeDelay();
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
Expand All @@ -50,7 +51,7 @@
*
* | <- Striped Block Group -> |
* blk_0 blk_1 blk_2 <- A striped block group has
* | | | {@link #groupSize} blocks
* | | | {@link #dataBlkNum} blocks
* v v v
* +------+ +------+ +------+
* |cell_0| |cell_1| |cell_2| <- The logical read order should be
Expand All @@ -72,7 +73,7 @@
public class DFSStripedInputStream extends DFSInputStream {
/**
* This method plans the read portion from each block in the stripe
* @param groupSize The size / width of the striping group
* @param dataBlkNum The number of data blocks in the striping group
* @param cellSize The size of each striping cell
* @param startInBlk Starting offset in the striped block
* @param len Length of the read request
Expand All @@ -81,29 +82,29 @@ public class DFSStripedInputStream extends DFSInputStream {
* for an individual block in the group
*/
@VisibleForTesting
static ReadPortion[] planReadPortions(final int groupSize,
static ReadPortion[] planReadPortions(final int dataBlkNum,
final int cellSize, final long startInBlk, final int len, int bufOffset) {
ReadPortion[] results = new ReadPortion[groupSize];
for (int i = 0; i < groupSize; i++) {
ReadPortion[] results = new ReadPortion[dataBlkNum];
for (int i = 0; i < dataBlkNum; i++) {
results[i] = new ReadPortion();
}

// cellIdxInBlk is the index of the cell in the block
// E.g., cell_3 is the 2nd cell in blk_0
int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
int cellIdxInBlk = (int) (startInBlk / (cellSize * dataBlkNum));

// blkIdxInGroup is the index of the block in the striped block group
// E.g., blk_2 is the 3rd block in the group
final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
final int blkIdxInGroup = (int) (startInBlk / cellSize % dataBlkNum);
results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
startInBlk % cellSize;
boolean crossStripe = false;
for (int i = 1; i < groupSize; i++) {
if (blkIdxInGroup + i >= groupSize && !crossStripe) {
for (int i = 1; i < dataBlkNum; i++) {
if (blkIdxInGroup + i >= dataBlkNum && !crossStripe) {
cellIdxInBlk++;
crossStripe = true;
}
results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
results[(blkIdxInGroup + i) % dataBlkNum].startOffsetInBlock =
cellSize * cellIdxInBlk;
}

Expand All @@ -112,57 +113,21 @@ static ReadPortion[] planReadPortions(final int groupSize,
results[blkIdxInGroup].lengths.add(firstCellLen);
results[blkIdxInGroup].readLength += firstCellLen;

int i = (blkIdxInGroup + 1) % groupSize;
int i = (blkIdxInGroup + 1) % dataBlkNum;
for (int done = firstCellLen; done < len; done += cellSize) {
ReadPortion rp = results[i];
rp.offsetsInBuf.add(done + bufOffset);
final int readLen = Math.min(len - done, cellSize);
rp.lengths.add(readLen);
rp.readLength += readLen;
i = (i + 1) % groupSize;
i = (i + 1) % dataBlkNum;
}
return results;
}

/**
* This method parses a striped block group into individual blocks.
*
* @param bg The striped block group
* @param dataBlkNum the number of data blocks
* @return An array containing the blocks in the group
*/
@VisibleForTesting
static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
int dataBlkNum, int cellSize) {
int locatedBGSize = bg.getBlockIndices().length;
// TODO not considering missing blocks for now, only identify data blocks
LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
for (short i = 0; i < locatedBGSize; i++) {
final int idx = bg.getBlockIndices()[i];
if (idx < dataBlkNum && lbs[idx] == null) {
lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
}
}
return lbs;
}

private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
// TODO: fix the numBytes computation

return new LocatedBlock(blk,
new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
null);
}


private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
private final short dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
private final short parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;

DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
throws IOException {
Expand Down Expand Up @@ -199,7 +164,7 @@ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
"LocatedStripedBlock for a striped file";

int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
% groupSize);
% dataBlkNum);
// If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
Expand All @@ -213,7 +178,8 @@ protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+ blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
}
return constructInternalBlock(lsb, i, cellSize, idx);
return StripedBlockUtil.constructInternalBlock(lsb, i, cellSize,
dataBlkNum, idx);
}

private LocatedBlock getBlockGroupAt(long offset) throws IOException {
Expand All @@ -240,13 +206,14 @@ protected void fetchBlockByteRange(long blockStartOffset, long start,
LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;

// Planning the portion of I/O for each shard
ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
len, offset);

// Parse group to get chosen DN location
LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
LocatedBlock[] blks = StripedBlockUtil.
parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);

for (short i = 0; i < groupSize; i++) {
for (short i = 0; i < dataBlkNum; i++) {
ReadPortion rp = readPortions[i];
if (rp.readLength <= 0) {
continue;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.util.DataChecksum;
Expand Down Expand Up @@ -309,17 +310,15 @@ protected void closeThreads(boolean force) throws IOException {
streamer.closeSocket();
if (streamer.isLeadingStreamer()) {
leadingStreamer = streamer;
} else {
streamer.countTailingBlockGroupBytes();
}

} catch (InterruptedException e) {
throw new IOException("Failed to shutdown streamer");
} finally {
streamer.setSocketToNull();
setClosed();
}
}
assert leadingStreamer != null : "One streamer should be leader";
leadingStreamer.countTailingBlockGroupBytes();
}

Expand All @@ -337,23 +336,28 @@ public synchronized void write(byte b[], int off, int len)
}

private void writeParityCellsForLastStripe() throws IOException{
if(currentBlockGroupBytes == 0 ||
currentBlockGroupBytes % stripeDataSize() == 0)
long parityBlkSize = StripedBlockUtil.getInternalBlockLength(
currentBlockGroupBytes, cellSize, blockGroupDataBlocks,
blockGroupDataBlocks + 1);
if (parityBlkSize == 0 || currentBlockGroupBytes % stripeDataSize() == 0) {
return;
int lastStripeLen =(int)(currentBlockGroupBytes % stripeDataSize());
// Size of parity cells should equal the size of the first cell, if it
// is not full.
int parityCellSize = cellSize;
int index = lastStripeLen / cellSize;
if (lastStripeLen < cellSize) {
parityCellSize = lastStripeLen;
index++;
}
int parityCellSize = parityBlkSize % cellSize == 0 ? cellSize :
(int) (parityBlkSize % cellSize);

for (int i = 0; i < blockGroupBlocks; i++) {
if (i >= index) {
long internalBlkLen = StripedBlockUtil.getInternalBlockLength(
currentBlockGroupBytes, cellSize, blockGroupDataBlocks, i);
// Pad zero bytes to make all cells exactly the size of parityCellSize
// If internal block is smaller than parity block, pad zero bytes.
// Also pad zero bytes to all parity cells
if (internalBlkLen < parityBlkSize || i >= blockGroupDataBlocks) {
int position = cellBuffers[i].position();
assert position <= parityCellSize : "If an internal block is smaller" +
" than parity block, then its last cell should be small than last" +
" parity cell";
for (int j = 0; j < parityCellSize - position; j++) {
cellBuffers[i].put((byte)0);
cellBuffers[i].put((byte) 0);
}
}
cellBuffers[i].flip();
Expand Down
Expand Up @@ -19,16 +19,16 @@
package org.apache.hadoop.hdfs;

import java.util.List;
import org.apache.hadoop.fs.StorageType;

import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;

Expand Down Expand Up @@ -134,19 +134,7 @@ void countTailingBlockGroupBytes () throws IOException {
"putting a block to stripeBlocks, ie = " + ie);
}
}
} else if (!isParityStreamer()) {
if (block == null || block.getNumBytes() == 0) {
LocatedBlock finishedBlock = new LocatedBlock(null, null);
try {
boolean offSuccess = stripedBlocks.get(0).offer(finishedBlock, 30,
TimeUnit.SECONDS);
} catch (InterruptedException ie) {
//TODO: Handle InterruptedException (HDFS-7786)
ie.printStackTrace();
}
}
}

}

@Override
Expand All @@ -155,8 +143,10 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
LocatedBlock lb = null;
if (isLeadingStreamer()) {
if(hasCommittedBlock) {
//when committing a block group, leading streamer has to adjust
// {@link block} including the size of block group
/**
* when committing a block group, leading streamer has to adjust
* {@link block} to include the size of block group
*/
for (int i = 1; i < HdfsConstants.NUM_DATA_BLOCKS; i++) {
try {
LocatedBlock finishedLocatedBlock = stripedBlocks.get(0).poll(30,
Expand All @@ -179,7 +169,13 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)

lb = super.locateFollowingBlock(excludedNodes);
hasCommittedBlock = true;
LocatedBlock[] blocks = unwrapBlockGroup(lb);
assert lb instanceof LocatedStripedBlock;
DFSClient.LOG.debug("Leading streamer obtained bg " + lb);
LocatedBlock[] blocks = StripedBlockUtil.
parseStripedBlockGroup((LocatedStripedBlock) lb,
HdfsConstants.BLOCK_STRIPED_CELL_SIZE, HdfsConstants.NUM_DATA_BLOCKS,
HdfsConstants.NUM_PARITY_BLOCKS
);
assert blocks.length == blockGroupSize :
"Fail to get block group from namenode: blockGroupSize: " +
blockGroupSize + ", blocks.length: " + blocks.length;
Expand Down Expand Up @@ -212,30 +208,4 @@ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
}
return lb;
}

/**
* Generate other blocks in a block group according to the first one.
*
* @param firstBlockInGroup the first block in a block group
* @return other blocks in this group
*/
public static LocatedBlock[] unwrapBlockGroup(
final LocatedBlock firstBlockInGroup) {
ExtendedBlock eb = firstBlockInGroup.getBlock();
DatanodeInfo[] locs = firstBlockInGroup.getLocations();
String[] storageIDs = firstBlockInGroup.getStorageIDs();
StorageType[] storageTypes = firstBlockInGroup.getStorageTypes();
Token<BlockTokenIdentifier> blockToken = firstBlockInGroup.getBlockToken();
LocatedBlock[] blocksInGroup = new LocatedBlock[locs.length];
for (int i = 0; i < blocksInGroup.length; i++) {
//each block in a group has the same number of bytes and timestamp
ExtendedBlock extendedBlock = new ExtendedBlock(eb.getBlockPoolId(),
eb.getBlockId() + i, eb.getNumBytes(), eb.getGenerationStamp());
blocksInGroup[i] = new LocatedBlock(extendedBlock,
new DatanodeInfo[] {locs[i]}, new String[]{storageIDs[i]},
new StorageType[] {storageTypes[i]});
blocksInGroup[i].setBlockToken(blockToken);
}
return blocksInGroup;
}
}
Expand Up @@ -83,6 +83,7 @@
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
Expand Down Expand Up @@ -1974,8 +1975,8 @@ public boolean processReport(final DatanodeID nodeID,
metrics.addBlockReport((int) (endTime - startTime));
}
blockLog.info("BLOCK* processReport: from storage {} node {}, " +
"blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage
.getStorageID(), nodeID, newReport.getNumberOfBlocks(),
"blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage
.getStorageID(), nodeID, newReport.getNumberOfBlocks(),
node.hasStaleStorages(), (endTime - startTime));
return !node.hasStaleStorages();
}
Expand All @@ -2002,8 +2003,8 @@ private void removeZombieReplicas(BlockReportContext context,
assert(zombie.numBlocks() == 0);
LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
"which no longer exists on the DataNode.",
Long.toHexString(context.getReportId()), prevBlocks,
zombie.getStorageID());
Long.toHexString(context.getReportId()), prevBlocks,
zombie.getStorageID());
}

/**
Expand Down Expand Up @@ -2487,7 +2488,22 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
}
boolean wrongSize;
if (storedBlock.isStriped()) {
assert BlockIdManager.isStripedBlockID(reported.getBlockId());
assert storedBlock.getBlockId() ==
BlockIdManager.convertToStripedID(reported.getBlockId());
BlockInfoStriped stripedBlock = (BlockInfoStriped) storedBlock;
int reportedBlkIdx = BlockIdManager.getBlockIndex(reported);
wrongSize = reported.getNumBytes() !=
getInternalBlockLength(stripedBlock.getNumBytes(),
HdfsConstants.BLOCK_STRIPED_CELL_SIZE,
stripedBlock.getDataBlockNum(), reportedBlkIdx);
} else {
wrongSize = storedBlock.getNumBytes() != reported.getNumBytes();
}
if (wrongSize) {
return new BlockToMarkCorrupt(new Block(reported), storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +
Expand Down

0 comments on commit 5e8837d

Please sign in to comment.