Skip to content

Commit

Permalink
HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutput…
Browse files Browse the repository at this point in the history
…Stream re-fetch token when expired. Contributed by Walter Su.
  • Loading branch information
Jing9 committed Jul 24, 2015
1 parent c2c26e6 commit 95b499a
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 154 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -373,3 +373,6 @@


HDFS-8813. Erasure Coding: Client no need to decode missing parity blocks. HDFS-8813. Erasure Coding: Client no need to decode missing parity blocks.
(Walter Su via jing9) (Walter Su via jing9)

HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream
re-fetch token when expired. (Walter Su via jing9)
Expand Up @@ -20,7 +20,6 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
Expand All @@ -44,7 +43,6 @@
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
Expand Down Expand Up @@ -206,44 +204,6 @@ private synchronized void blockSeekTo(long target) throws IOException {
currentLocatedBlock = targetBlockGroup; currentLocatedBlock = targetBlockGroup;
} }


/**
* @throws IOException only when failing to refetch block token, which happens
* when this client cannot get located block information from NameNode. This
* method returns null instead of throwing exception when failing to connect
* to the DataNode.
*/
private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
long offsetInBlock, long length, InetSocketAddress targetAddr,
StorageType storageType, DatanodeInfo datanode, long offsetInFile,
ReaderRetryPolicy retry) throws IOException {
// only need to get a new access token or a new encryption key once
while (true) {
try {
return getBlockReader(targetBlock, offsetInBlock, length, targetAddr,
storageType, datanode);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + targetAddr
+ " : " + e);
dfsClient.clearDataEncryptionKey();
retry.refetchEncryptionKey();
} else if (retry.shouldRefetchToken() &&
tokenRefetchNeeded(e, targetAddr)) {
fetchBlockAt(offsetInFile);
retry.refetchToken();
} else {
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ ", add to deadNodes and continue.", e);
// Put chosen node into dead list, continue
addToDeadNodes(datanode);
return null;
}
}
}
}

/** /**
* Extend the super method with the logic of switching between cells. * Extend the super method with the logic of switching between cells.
* When reaching the end of a cell, proceed to the next cell and read it * When reaching the end of a cell, proceed to the next cell and read it
Expand Down Expand Up @@ -293,13 +253,13 @@ private void readOneStripe(
final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen); final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize() final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
- (stripeIndex * stripeLen), stripeLen); - (stripeIndex * stripeLen), stripeLen);
curStripeRange = new StripeRange(offsetInBlockGroup, StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
stripeLimit - stripeBufOffset); stripeLimit - stripeBufOffset);


LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize, AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize,
blockGroup, offsetInBlockGroup, blockGroup, offsetInBlockGroup,
offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf); offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
blockGroup, cellSize, dataBlkNum, parityBlkNum); blockGroup, cellSize, dataBlkNum, parityBlkNum);
// read the whole stripe // read the whole stripe
Expand All @@ -311,6 +271,7 @@ private void readOneStripe(
} }
curStripeBuf.position(stripeBufOffset); curStripeBuf.position(stripeBufOffset);
curStripeBuf.limit(stripeLimit); curStripeBuf.limit(stripeLimit);
curStripeRange = stripeRange;
} }


private Callable<Void> readCells(final BlockReader reader, private Callable<Void> readCells(final BlockReader reader,
Expand Down Expand Up @@ -423,7 +384,6 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy,
} }
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap = Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
failures = 0;
if (pos < getFileLength()) { if (pos < getFileLength()) {
try { try {
if (pos > blockEnd) { if (pos > blockEnd) {
Expand Down Expand Up @@ -623,13 +583,46 @@ void readParityChunks(int num) throws IOException {


boolean createBlockReader(LocatedBlock block, int chunkIndex) boolean createBlockReader(LocatedBlock block, int chunkIndex)
throws IOException { throws IOException {
DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null); BlockReader reader = null;
if (dnInfo != null) { final ReaderRetryPolicy retry = new ReaderRetryPolicy();
BlockReader reader = getBlockReaderWithRetry(block, DNAddrPair dnInfo = new DNAddrPair(null, null, null);
alignedStripe.getOffsetInBlock(),
block.getBlockSize() - alignedStripe.getOffsetInBlock(), while(true) {
dnInfo.addr, dnInfo.storageType, dnInfo.info, try {
block.getStartOffset(), new ReaderRetryPolicy()); // the cached block location might have been re-fetched, so always
// get it from cache.
block = refreshLocatedBlock(block);
targetBlocks[chunkIndex] = block;

// internal block has one location, just rule out the deadNodes
dnInfo = getBestNodeDNAddrPair(block, null);
if (dnInfo == null) {
break;
}
reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
block.getBlockSize() - alignedStripe.getOffsetInBlock(),
dnInfo.addr, dnInfo.storageType, dnInfo.info);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + dnInfo.addr
+ " : " + e);
dfsClient.clearDataEncryptionKey();
retry.refetchEncryptionKey();
} else if (retry.shouldRefetchToken() &&
tokenRefetchNeeded(e, dnInfo.addr)) {
fetchBlockAt(block.getStartOffset());
retry.refetchToken();
} else {
//TODO: handles connection issues
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
"block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset());
addToDeadNodes(dnInfo.info);
}
}
if (reader != null) { if (reader != null) {
readerInfos[chunkIndex] = new BlockReaderInfo(reader, block, readerInfos[chunkIndex] = new BlockReaderInfo(reader, block,
dnInfo.info, alignedStripe.getOffsetInBlock()); dnInfo.info, alignedStripe.getOffsetInBlock());
Expand Down
Expand Up @@ -195,12 +195,15 @@ void populate() throws IOException {
final ExtendedBlock bg = coordinator.getBlockGroup(); final ExtendedBlock bg = coordinator.getBlockGroup();
final LocatedBlock updated = callUpdateBlockForPipeline(bg); final LocatedBlock updated = callUpdateBlockForPipeline(bg);
final long newGS = updated.getBlock().getGenerationStamp(); final long newGS = updated.getBlock().getGenerationStamp();
final LocatedBlock[] updatedBlks = StripedBlockUtil
.parseStripedBlockGroup((LocatedStripedBlock) updated,
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
if (bi != null) { if (bi != null) {
final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
null, null, null, -1, updated.isCorrupt(), null); null, null, null, -1, updated.isCorrupt(), null);
lb.setBlockToken(updated.getBlockToken()); lb.setBlockToken(updatedBlks[i].getBlockToken());
newBlocks.offer(i, lb); newBlocks.offer(i, lb);
} else { } else {
final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i); final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i);
Expand Down
Expand Up @@ -119,8 +119,8 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
bg.getStartOffset(), bg.isCorrupt(), null); bg.getStartOffset(), bg.isCorrupt(), null);
} }
Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens(); Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens();
if (idxInBlockGroup < blockTokens.length) { if (idxInReturnedLocs < blockTokens.length) {
locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]); locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]);
} }
return locatedBlock; return locatedBlock;
} }
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
Expand Down

0 comments on commit 95b499a

Please sign in to comment.