Skip to content

Commit

Permalink
HDFS-8481. Erasure coding: remove workarounds in client side stripped…
Browse files Browse the repository at this point in the history
… blocks recovering. Contributed by Zhe Zhang.
  • Loading branch information
Zhe Zhang committed Jun 1, 2015
1 parent 1299357 commit 014bd32
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 36 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -262,3 +262,6 @@

HDFS-8479. Erasure coding: fix striping related logic in FSDirWriteFileOp to
sync with HDFS-8421. (Zhe Zhang via jing9)

HDFS-8481. Erasure coding: remove workarounds in client side stripped blocks
recovering. (zhz)
Expand Up @@ -31,6 +31,7 @@

import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.finalizeDecodeInputs;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks;
Expand All @@ -41,6 +42,8 @@

import org.apache.hadoop.io.erasurecode.ECSchema;

import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
Expand Down Expand Up @@ -117,6 +120,8 @@ boolean include(long pos) {
/** the buffer for a complete stripe */
private ByteBuffer curStripeBuf;
private final ECSchema schema;
private final RawErasureDecoder decoder;

/**
* indicate the start/end offset of the current buffered stripe in the
* block group
Expand All @@ -139,6 +144,7 @@ boolean include(long pos) {
curStripeRange = new StripeRange(0, 0);
readingService =
new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
decoder = new RSRawDecoder(dataBlkNum, parityBlkNum);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Creating an striped input stream for file " + src);
}
Expand Down Expand Up @@ -591,8 +597,9 @@ private void fetchOneStripe(LocatedStripedBlock blockGroup,
}

if (alignedStripe.missingChunksNum > 0) {
decodeAndFillBuffer(decodeInputs, buf, alignedStripe,
dataBlkNum, parityBlkNum);
finalizeDecodeInputs(decodeInputs, alignedStripe);
decodeAndFillBuffer(decodeInputs, buf, alignedStripe, parityBlkNum,
decoder);
}
}

Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;

import java.util.*;
import java.io.IOException;
Expand Down Expand Up @@ -246,19 +247,36 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,

/**
* Initialize the decoding input buffers based on the chunk states in an
* AlignedStripe
* {@link AlignedStripe}. For each chunk that was not initially requested,
* schedule a new fetch request with the decoding input buffer as transfer
* destination.
*/
public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
int dataBlkNum, int parityBlkNum) {
byte[][] decodeInputs =
new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
for (int i = 0; i < alignedStripe.chunks.length; i++) {
StripingChunk chunk = alignedStripe.chunks[i];
if (chunk == null) {
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
alignedStripe.chunks[i].offsetsInBuf.add(0);
alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock());
} else if (chunk.state == StripingChunk.FETCHED) {
}
}
return decodeInputs;
}

/**
* Some fetched {@link StripingChunk} might be stored in original application
* buffer instead of prepared decode input buffers. Some others are beyond
* the range of the internal blocks and should correspond to all zero bytes.
* When all pending requests have returned, this method should be called to
* finalize decode input buffers.
*/
public static void finalizeDecodeInputs(final byte[][] decodeInputs,
AlignedStripe alignedStripe) {
for (int i = 0; i < alignedStripe.chunks.length; i++) {
StripingChunk chunk = alignedStripe.chunks[i];
if (chunk.state == StripingChunk.FETCHED) {
int posInBuf = 0;
for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
Expand All @@ -267,39 +285,41 @@ public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
}
} else if (chunk.state == StripingChunk.ALLZERO) {
Arrays.fill(decodeInputs[i], (byte)0);
} else {
decodeInputs[i] = null;
}
}
return decodeInputs;
}

/**
* Decode based on the given input buffers and schema
* Decode based on the given input buffers and schema.
*/
public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf,
AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) {
public static void decodeAndFillBuffer(final byte[][] decodeInputs,
byte[] buf, AlignedStripe alignedStripe, int parityBlkNum,
RawErasureDecoder decoder) {
// Step 1: prepare indices and output buffers for missing data units
int[] decodeIndices = new int[parityBlkNum];
int pos = 0;
for (int i = 0; i < alignedStripe.chunks.length; i++) {
if (alignedStripe.chunks[i].state != StripingChunk.FETCHED &&
alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
if (alignedStripe.chunks[i].state == StripingChunk.MISSING){
decodeIndices[pos++] = i;
}
}
decodeIndices = Arrays.copyOf(decodeIndices, pos);
byte[][] decodeOutputs =
new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()];

byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()];
RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlkNum, parityBlkNum);
rsRawDecoder.decode(decodeInputs, decodeIndices, outputs);
// Step 2: decode into prepared output buffers
decoder.decode(decodeInputs, decodeIndices, decodeOutputs);

for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
StripingChunk chunk = alignedStripe.chunks[i];
// Step 3: fill original application buffer with decoded data
for (int i = 0; i < decodeIndices.length; i++) {
int missingBlkIdx = decodeIndices[i];
StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
if (chunk.state == StripingChunk.MISSING) {
int srcPos = 0;
for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
//TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
// System.arraycopy(outputs[i], srcPos, buf, chunk.offsetsInBuf.get(j),
// chunk.lengthsInBuf.get(j));
Arrays.fill(buf, chunk.offsetsInBuf.get(j),
chunk.offsetsInBuf.get(j) + chunk.lengthsInBuf.get(j), (byte)7);
System.arraycopy(decodeOutputs[i], srcPos, buf, chunk.offsetsInBuf.get(j),
chunk.lengthsInBuf.get(j));
srcPos += chunk.lengthsInBuf.get(j);
}
}
Expand Down
Expand Up @@ -221,13 +221,13 @@ public void testPreadWithDNFailure() throws Exception {
decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte(
new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
}
// RSRawDecoder rsRawDecoder = new RSRawDecoder();
// rsRawDecoder.initialize(DATA_BLK_NUM, PARITY_BLK_NUM, CELLSIZE);
// rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
for (int m : missingBlkIdx) {
decodeInputs[m] = null;
}
RSRawDecoder rsRawDecoder = new RSRawDecoder(DATA_BLK_NUM, PARITY_BLK_NUM);
rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
// System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
//TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
Arrays.fill(expected, posInBuf, posInBuf + CELLSIZE, (byte)7);
System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
}
int delta = 10;
int done = 0;
Expand Down
Expand Up @@ -382,15 +382,9 @@ public void testWritePreadWithDNFailure() throws IOException {
Assert.assertEquals("The length of file should be the same to write size",
length - startOffsetInFile, readLen);

RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlocks, parityBlocks);
byte[] expected = new byte[readLen];
for (int i = startOffsetInFile; i < length; i++) {
//TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
if ((i / cellSize) % dataBlocks == failedDNIdx) {
expected[i - startOffsetInFile] = (byte)7;
} else {
expected[i - startOffsetInFile] = getByte(i);
}
expected[i - startOffsetInFile] = getByte(i);
}
for (int i = startOffsetInFile; i < length; i++) {
Assert.assertEquals("Byte at " + i + " should be the same",
Expand Down

0 comments on commit 014bd32

Please sign in to comment.