Skip to content

Commit

Permalink
HDFS-7678. Erasure coding: DFSInputStream with decode functionality (…
Browse files Browse the repository at this point in the history
…pread). Contributed by Zhe Zhang.
  • Loading branch information
zhe-thoughts authored and Zhe Zhang committed May 26, 2015
1 parent 6bacaa9 commit 8d3030f
Show file tree
Hide file tree
Showing 6 changed files with 768 additions and 72 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -195,3 +195,6 @@

HDFS-8355. Erasure Coding: Refactor BlockInfo and BlockInfoUnderConstruction.
(Tsz Wo Nicholas Sze via jing9)

HDFS-7678. Erasure coding: DFSInputStream with decode functionality (pread).
(Zhe Zhang)
Expand Up @@ -21,26 +21,40 @@
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;

import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;

import org.apache.hadoop.io.erasurecode.ECSchema;

import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;

import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.Set;
import java.util.Collection;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.CompletionService;
Expand All @@ -51,7 +65,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Future;


/******************************************************************************
* DFSStripedInputStream reads from striped block groups, illustrated below:
*
Expand Down Expand Up @@ -125,6 +138,7 @@ boolean include(long pos) {
private final short parityBlkNum;
/** the buffer for a complete stripe */
private ByteBuffer curStripeBuf;
private final ECSchema schema;
/**
* indicate the start/end offset of the current buffered stripe in the
* block group
Expand All @@ -137,6 +151,7 @@ boolean include(long pos) {
super(dfsClient, src, verifyChecksum);

assert schema != null;
this.schema = schema;
cellSize = schema.getChunkSize();
dataBlkNum = (short) schema.getNumDataUnits();
parityBlkNum = (short) schema.getNumParityUnits();
Expand Down Expand Up @@ -472,12 +487,10 @@ private int copy(ReaderStrategy strategy, int offset, int length) {
*/
@Override
protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
LocatedBlock lb = super.getBlockAt(blkStartOffset);
assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
"LocatedStripedBlock for a striped file";
LocatedBlock lb = getBlockGroupAt(blkStartOffset);

int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
% dataBlkNum);
int idx = (int) ((blkStartOffset - lb.getStartOffset())
% (dataBlkNum + parityBlkNum));
// If indexing information is returned, iterate through the index array
// to find the entry for position idx in the group
LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
Expand Down Expand Up @@ -509,48 +522,121 @@ protected void fetchBlockByteRange(long blockStartOffset, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
Map<Future<Void>, Integer> futures = new HashMap<>();
CompletionService<Void> stripedReadsService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
int len = (int) (end - start + 1);

// Refresh the striped block group
LocatedStripedBlock blockGroup = getBlockGroupAt(blockStartOffset);

AlignedStripe[] stripes = divideByteRangeIntoStripes(schema, blockGroup,
start, end, buf, offset);
for (AlignedStripe stripe : stripes) {
fetchOneStripe(blockGroup, buf, stripe, corruptedBlockMap);
}
}

// Planning the portion of I/O for each shard
ReadPortion[] readPortions = planReadPortions(dataBlkNum, cellSize, start,
len, offset);

private void fetchOneStripe(LocatedStripedBlock blockGroup,
byte[] buf, AlignedStripe alignedStripe, Map<ExtendedBlock,
Set<DatanodeInfo>> corruptedBlockMap) throws IOException {
Map<Future<Void>, Integer> futures = new HashMap<>();
CompletionService<Void> service =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
if (alignedStripe.getSpanInBlock() == 0) {
DFSClient.LOG.warn("Trying to read an empty stripe from" + blockGroup);
return;
}
// Parse group to get chosen DN location
LocatedBlock[] blks = StripedBlockUtil.
parseStripedBlockGroup(blockGroup, cellSize, dataBlkNum, parityBlkNum);

for (short i = 0; i < dataBlkNum; i++) {
ReadPortion rp = readPortions[i];
if (rp.getReadLength() <= 0) {
continue;
if (alignedStripe.chunks[i] != null
&& alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
corruptedBlockMap);
}
DatanodeInfo loc = blks[i].getLocations()[0];
StorageType type = blks[i].getStorageTypes()[0];
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
type);
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
blks[i].getStartOffset(), rp.getStartOffsetInBlock(),
rp.getStartOffsetInBlock() + rp.getReadLength() - 1, buf,
rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
futures.put(getFromDNRequest, (int) i);
}
// Input buffers for potential decode operation, which remains null until
// first read failure
byte[][] decodeInputs = null;
while (!futures.isEmpty()) {
try {
waitNextCompletion(stripedReadsService, futures);
StripingChunkReadResult r = getNextCompletedStripedRead(
service, futures, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Read task returned: " + r + ", for stripe " + alignedStripe);
}
StripingChunk returnedChunk = alignedStripe.chunks[r.index];
Preconditions.checkNotNull(returnedChunk);
Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
if (r.state == StripingChunkReadResult.SUCCESSFUL) {
returnedChunk.state = StripingChunk.FETCHED;
alignedStripe.fetchedChunksNum++;
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
clearFutures(futures.keySet());
break;
}
} else {
returnedChunk.state = StripingChunk.MISSING;
alignedStripe.missingChunksNum++;
if (alignedStripe.missingChunksNum > parityBlkNum) {
clearFutures(futures.keySet());
throw new IOException("Too many blocks are missing: " + alignedStripe);
}
// When seeing first missing block, initialize decode input buffers
if (decodeInputs == null) {
decodeInputs = initDecodeInputs(alignedStripe, dataBlkNum, parityBlkNum);
}
for (int i = 0; i < alignedStripe.chunks.length; i++) {
StripingChunk chunk = alignedStripe.chunks[i];
Preconditions.checkNotNull(chunk);
if (chunk.state == StripingChunk.REQUESTED && i <= dataBlkNum) {
fetchOneStripingChunk(futures, service, blks[i], alignedStripe, i,
corruptedBlockMap);
}
}
}
} catch (InterruptedException ie) {
// Ignore and retry
String err = "Read request interrupted";
DFSClient.LOG.error(err);
clearFutures(futures.keySet());
// Don't decode if read interrupted
throw new InterruptedIOException(err);
}
}

if (alignedStripe.missingChunksNum > 0) {
decodeAndFillBuffer(decodeInputs, buf, alignedStripe,
dataBlkNum, parityBlkNum);
}
}

/**
* Schedule a single read request to an internal block
* @param block The internal block
* @param index Index of the internal block in the group
* @param corruptedBlockMap Map of corrupted blocks
*/
private void fetchOneStripingChunk(Map<Future<Void>, Integer> futures,
final CompletionService<Void> service, final LocatedBlock block,
final AlignedStripe alignedStripe, final int index,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
DatanodeInfo loc = block.getLocations()[0];
StorageType type = block.getStorageTypes()[0];
DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
loc.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname())),
type);
StripingChunk chunk = alignedStripe.chunks[index];
chunk.state = StripingChunk.PENDING;
Callable<Void> readCallable = getFromOneDataNode(dnAddr,
block.getStartOffset(), alignedStripe.getOffsetInBlock(),
alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1, chunk.buf,
chunk.getOffsets(), chunk.getLengths(),
corruptedBlockMap, index);
Future<Void> getFromDNRequest = service.submit(readCallable);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Submitting striped read request for " + index +
". Info of the block: " + block + ", offset in block is " +
alignedStripe.getOffsetInBlock() + ", end is " +
(alignedStripe.getOffsetInBlock() + alignedStripe.getSpanInBlock() - 1));
}
futures.put(getFromDNRequest, index);
}

private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
Expand Down Expand Up @@ -609,4 +695,12 @@ public synchronized void releaseBuffer(ByteBuffer buffer) {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}

/** A variation to {@link DFSInputStream#cancelAll} */
private void clearFutures(Collection<Future<Void>> futures) {
for (Future<Void> future : futures) {
future.cancel(false);
}
futures.clear();
}
}
Expand Up @@ -67,7 +67,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripedReadResult;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
Expand Down Expand Up @@ -462,10 +462,10 @@ private int readMinimumStripedData4Recovery(int[] success) {
int nsuccess = 0;
while (!futures.isEmpty()) {
try {
StripedReadResult result =
StripingChunkReadResult result =
StripedBlockUtil.getNextCompletedStripedRead(
readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
if (result.state == StripedReadResult.SUCCESSFUL) {
if (result.state == StripingChunkReadResult.SUCCESSFUL) {
success[nsuccess++] = result.index;
if (nsuccess >= dataBlkNum) {
// cancel remaining reads if we read successfully from minimum
Expand All @@ -474,14 +474,14 @@ private int readMinimumStripedData4Recovery(int[] success) {
futures.clear();
break;
}
} else if (result.state == StripedReadResult.FAILED) {
} else if (result.state == StripingChunkReadResult.FAILED) {
// If read failed for some source, we should not use it anymore
// and schedule read from a new source.
StripedReader failedReader = stripedReaders.get(result.index);
closeBlockReader(failedReader.blockReader);
failedReader.blockReader = null;
scheduleNewRead(used);
} else if (result.state == StripedReadResult.TIMEOUT) {
} else if (result.state == StripingChunkReadResult.TIMEOUT) {
// If timeout, we also schedule a new read.
scheduleNewRead(used);
}
Expand Down

0 comments on commit 8d3030f

Please sign in to comment.