Skip to content

Commit

Permalink
HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStrea…
Browse files Browse the repository at this point in the history
…m. Contributed by Yi Liu.
  • Loading branch information
Jing9 authored and Zhe Zhang committed May 26, 2015
1 parent ac97edd commit a17cedb
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 23 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -186,3 +186,6 @@


HDFS-8129. Erasure Coding: Maintain consistent naming for Erasure Coding related classes - EC/ErasureCoding HDFS-8129. Erasure Coding: Maintain consistent naming for Erasure Coding related classes - EC/ErasureCoding
(umamahesh) (umamahesh)

HDFS-8203. Erasure Coding: Seek and other Ops in DFSStripedInputStream.
(Yi Liu via jing9)
Expand Up @@ -19,10 +19,13 @@


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;

import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ReadPortion;
import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions; import static org.apache.hadoop.hdfs.util.StripedBlockUtil.planReadPortions;


Expand All @@ -31,9 +34,11 @@
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;


import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.Set; import java.util.Set;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -263,6 +268,10 @@ protected void closeCurrentBlockReaders() {
} }


private long getOffsetInBlockGroup() { private long getOffsetInBlockGroup() {
return getOffsetInBlockGroup(pos);
}

private long getOffsetInBlockGroup(long pos) {
return pos - currentLocatedBlock.getStartOffset(); return pos - currentLocatedBlock.getStartOffset();
} }


Expand All @@ -278,18 +287,22 @@ private void readOneStripe(
// compute stripe range based on pos // compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup(); final long offsetInBlockGroup = getOffsetInBlockGroup();
final long stripeLen = cellSize * dataBlkNum; final long stripeLen = cellSize * dataBlkNum;
int stripeIndex = (int) (offsetInBlockGroup / stripeLen); final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
curStripeRange = new StripeRange(stripeIndex * stripeLen, final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen), final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
stripeLen)); - (stripeIndex * stripeLen), stripeLen);
final int numCell = (int) ((curStripeRange.length - 1) / cellSize + 1); curStripeRange = new StripeRange(offsetInBlockGroup,
stripeLimit - stripeBufOffset);

final int startCell = stripeBufOffset / cellSize;
final int numCell = (stripeLimit - 1) / cellSize + 1;


// read the whole stripe in parallel // read the whole stripe in parallel
Map<Future<Integer>, Integer> futures = new HashMap<>(); Map<Future<Integer>, Integer> futures = new HashMap<>();
for (int i = 0; i < numCell; i++) { for (int i = startCell; i < numCell; i++) {
curStripeBuf.position(cellSize * i); int bufPos = i == startCell ? stripeBufOffset : cellSize * i;
curStripeBuf.limit((int) Math.min(cellSize * (i + 1), curStripeBuf.position(bufPos);
curStripeRange.length)); curStripeBuf.limit(Math.min(cellSize * (i + 1), stripeLimit));
ByteBuffer buf = curStripeBuf.slice(); ByteBuffer buf = curStripeBuf.slice();
ByteBufferStrategy strategy = new ByteBufferStrategy(buf); ByteBufferStrategy strategy = new ByteBufferStrategy(buf);
final int targetLength = buf.remaining(); final int targetLength = buf.remaining();
Expand Down Expand Up @@ -329,6 +342,39 @@ public Integer call() throws Exception {
}; };
} }


/**
* Seek to a new arbitrary location
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos > getFileLength()) {
throw new EOFException("Cannot seek after EOF");
}
if (targetPos < 0) {
throw new EOFException("Cannot seek to negative offset");
}
if (closed.get()) {
throw new IOException("Stream is closed!");
}
if (targetPos <= blockEnd) {
final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
if (curStripeRange.include(targetOffsetInBlk)) {
int bufOffset = getStripedBufOffset(targetOffsetInBlk);
curStripeBuf.position(bufOffset);
pos = targetPos;
return;
}
}
pos = targetPos;
blockEnd = -1;
}

private int getStripedBufOffset(long offsetInBlockGroup) {
final long stripeLen = cellSize * dataBlkNum;
// compute the position in the curStripeBuf based on "pos"
return (int) (offsetInBlockGroup % stripeLen);
}

@Override @Override
protected synchronized int readWithStrategy(ReaderStrategy strategy, protected synchronized int readWithStrategy(ReaderStrategy strategy,
int off, int len) throws IOException { int off, int len) throws IOException {
Expand Down Expand Up @@ -405,10 +451,8 @@ private int readBuffer(BlockReader blockReader,
* @return number of bytes copied * @return number of bytes copied
*/ */
private int copy(ReaderStrategy strategy, int offset, int length) { private int copy(ReaderStrategy strategy, int offset, int length) {
final long stripeLen = cellSize * dataBlkNum; final long offsetInBlk = getOffsetInBlockGroup();
final long offsetInBlk = pos - currentLocatedBlock.getStartOffset(); int bufOffset = getStripedBufOffset(offsetInBlk);
// compute the position in the curStripeBuf based on "pos"
int bufOffset = (int) (offsetInBlk % stripeLen);
curStripeBuf.position(bufOffset); curStripeBuf.position(bufOffset);
return strategy.copyFrom(curStripeBuf, offset, return strategy.copyFrom(curStripeBuf, offset,
Math.min(length, curStripeBuf.remaining())); Math.min(length, curStripeBuf.remaining()));
Expand Down Expand Up @@ -546,4 +590,22 @@ private <T> void waitNextCompletion(CompletionService<T> service,
} }
throw new InterruptedException("let's retry"); throw new InterruptedException("let's retry");
} }

/**
* May need online read recovery, zero-copy read doesn't make
* sense, so don't support it.
*/
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}

@Override
public synchronized void releaseBuffer(ByteBuffer buffer) {
throw new UnsupportedOperationException(
"Not support enhanced byte buffer access.");
}
} }
Expand Up @@ -22,12 +22,12 @@
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;

import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;


import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


Expand Down Expand Up @@ -150,11 +150,35 @@ private byte[] generateBytes(int cnt) {
return bytes; return bytes;
} }


private int readAll(FSDataInputStream in, byte[] buf) throws IOException {
int readLen = 0;
int ret;
do {
ret = in.read(buf, readLen, buf.length - readLen);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0 && readLen < buf.length);
return readLen;
}

private byte getByte(long pos) { private byte getByte(long pos) {
final int mod = 29; final int mod = 29;
return (byte) (pos % mod + 1); return (byte) (pos % mod + 1);
} }


private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
int writeBytes) throws IOException {
fsdis.seek(pos);
byte[] buf = new byte[writeBytes];
int readLen = readAll(fsdis, buf);
Assert.assertEquals(readLen, writeBytes - pos);
for (int i = 0; i < readLen; i++) {
Assert.assertEquals("Byte at " + i + " should be the same",
getByte(pos + i), buf[i]);
}
}

private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes) private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
throws IOException { throws IOException {
Path testPath = new Path(src); Path testPath = new Path(src);
Expand Down Expand Up @@ -183,15 +207,7 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
// stateful read with byte array // stateful read with byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) { try (FSDataInputStream fsdis = fs.open(new Path(src))) {
byte[] buf = new byte[writeBytes + 100]; byte[] buf = new byte[writeBytes + 100];
int readLen = 0; int readLen = readAll(fsdis, buf);
int ret;
do {
ret = fsdis.read(buf, readLen, buf.length - readLen);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size", Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen); writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) { for (int i = 0; i < writeBytes; i++) {
Expand All @@ -200,6 +216,53 @@ private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
} }
} }


// seek and stateful read
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
// seek to 1/2 of content
int pos = writeBytes/2;
assertSeekAndRead(fsdis, pos, writeBytes);

// seek to 1/3 of content
pos = writeBytes/3;
assertSeekAndRead(fsdis, pos, writeBytes);

// seek to 0 pos
pos = 0;
assertSeekAndRead(fsdis, pos, writeBytes);

if (writeBytes > cellSize) {
// seek to cellSize boundary
pos = cellSize -1;
assertSeekAndRead(fsdis, pos, writeBytes);
}

if (writeBytes > cellSize * dataBlocks) {
// seek to striped cell group boundary
pos = cellSize * dataBlocks - 1;
assertSeekAndRead(fsdis, pos, writeBytes);
}

if (writeBytes > blockSize * dataBlocks) {
// seek to striped block group boundary
pos = blockSize * dataBlocks - 1;
assertSeekAndRead(fsdis, pos, writeBytes);
}

try {
fsdis.seek(-1);
Assert.fail("Should be failed if seek to negative offset");
} catch (EOFException e) {
// expected
}

try {
fsdis.seek(writeBytes + 1);
Assert.fail("Should be failed if seek after EOF");
} catch (EOFException e) {
// expected
}
}

// stateful read with ByteBuffer // stateful read with ByteBuffer
try (FSDataInputStream fsdis = fs.open(new Path(src))) { try (FSDataInputStream fsdis = fs.open(new Path(src))) {
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100); ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
Expand Down

0 comments on commit a17cedb

Please sign in to comment.