Skip to content

Commit

Permalink
HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (walt…
Browse files Browse the repository at this point in the history
…ersu4549)
  • Loading branch information
waltersu4549 authored and Zhe Zhang committed May 26, 2015
1 parent c9103e9 commit a919726
Showing 1 changed file with 148 additions and 119 deletions.
Expand Up @@ -21,9 +21,13 @@
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -33,23 +37,26 @@
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;

public class TestWriteReadStripedFile {
private static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
private static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;


private static DistributedFileSystem fs;
private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final static int stripesPerBlock = 4;
static int blockSize = cellSize * stripesPerBlock;
static int numDNs = dataBlocks + parityBlocks + 2;

private static MiniDFSCluster cluster;
private static Configuration conf;
private static FileSystem fs;

private static Random r= new Random();

@BeforeClass
public static void setup() throws IOException {
Configuration conf = new Configuration();
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
cluster.getFileSystem().getClient().createErasureCodingZone("/", null);
Expand Down Expand Up @@ -134,7 +141,7 @@ public void testFileMoreThanABlockGroup1() throws IOException {
@Test
public void testFileMoreThanABlockGroup2() throws IOException {
testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
blockSize * dataBlocks + cellSize+ 123);
blockSize * dataBlocks + cellSize + 123);
}


Expand Down Expand Up @@ -171,7 +178,7 @@ private byte getByte(long pos) {
}

private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
int writeBytes) throws IOException {
int writeBytes) throws IOException {
fsdis.seek(pos);
byte[] buf = new byte[writeBytes];
int readLen = readAll(fsdis, buf);
Expand All @@ -182,147 +189,169 @@ private void assertSeekAndRead(FSDataInputStream fsdis, int pos,
}
}

private void testOneFileUsingDFSStripedInputStream(String src, int writeBytes)
private void testOneFileUsingDFSStripedInputStream(String src, int fileLength)
throws IOException {
Path testPath = new Path(src);
final byte[] bytes = generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, testPath, new String(bytes));

//check file length
FileStatus status = fs.getFileStatus(testPath);
long fileLength = status.getLen();
final byte[] expected = generateBytes(fileLength);
Path srcPath = new Path(src);
DFSTestUtil.writeFile(fs, srcPath, new String(expected));

verifyLength(fs, srcPath, fileLength);

byte[] smallBuf = new byte[1024];
byte[] largeBuf = new byte[fileLength + 100];
verifyPread(fs, srcPath, fileLength, expected, largeBuf);

verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
verifySeek(fs, srcPath, fileLength);
verifyStatefulRead(fs, srcPath, fileLength, expected,
ByteBuffer.allocate(fileLength + 100));
verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
verifyStatefulRead(fs, srcPath, fileLength, expected,
ByteBuffer.allocate(1024));
}

@Test
public void testWriteReadUsingWebHdfs() throws Exception {
int fileLength = blockSize * dataBlocks + cellSize + 123;

final byte[] expected = generateBytes(fileLength);
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
WebHdfsConstants.WEBHDFS_SCHEME);
Path srcPath = new Path("/testWriteReadUsingWebHdfs_stripe");
DFSTestUtil.writeFile(fs, srcPath, new String(expected));

verifyLength(fs, srcPath, fileLength);

byte[] smallBuf = new byte[1024];
byte[] largeBuf = new byte[fileLength + 100];
verifyPread(fs, srcPath, fileLength, expected, largeBuf);

verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf);
verifySeek(fs, srcPath, fileLength);
verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf);
//webhdfs doesn't support bytebuffer read

}

void verifyLength(FileSystem fs, Path srcPath, int fileLength)
throws IOException {
FileStatus status = fs.getFileStatus(srcPath);
Assert.assertEquals("File length should be the same",
writeBytes, fileLength);
fileLength, status.getLen());
}

// pread
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
byte[] buf = new byte[writeBytes + 100];
int readLen = fsdis.read(0, buf, 0, buf.length);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
void verifyPread(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, byte[] buf) throws IOException {
FSDataInputStream in = fs.open(srcPath);
int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
cellSize * dataBlocks, fileLength - 102, fileLength - 1};
for (int startOffset : startOffsets) {
startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
int remaining = fileLength - startOffset;
in.readFully(startOffset, buf, 0, remaining);
for (int i = 0; i < remaining; i++) {
Assert.assertEquals("Byte at " + (startOffset + i) + " should be the " +
"same",
expected[startOffset + i], buf[i]);
}
}
in.close();
}

// stateful read with byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
byte[] buf = new byte[writeBytes + 100];
int readLen = readAll(fsdis, buf);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf[i]);
void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, byte[] buf) throws IOException {
FSDataInputStream in = fs.open(srcPath);
final byte[] result = new byte[fileLength];
int readLen = 0;
int ret;
do {
ret = in.read(buf, 0, buf.length);
if (ret > 0) {
System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret;
}
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
fileLength, readLen);
Assert.assertArrayEquals(expected, result);
in.close();
}

// seek and stateful read
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
// seek to 1/2 of content
int pos = writeBytes/2;
assertSeekAndRead(fsdis, pos, writeBytes);

// seek to 1/3 of content
pos = writeBytes/3;
assertSeekAndRead(fsdis, pos, writeBytes);
void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, ByteBuffer buf) throws IOException {
FSDataInputStream in = fs.open(srcPath);
ByteBuffer result = ByteBuffer.allocate(fileLength);
int readLen = 0;
int ret;
do {
ret = in.read(buf);
if (ret > 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
fileLength, readLen);
Assert.assertArrayEquals(expected, result.array());
in.close();
}

// seek to 0 pos
pos = 0;
assertSeekAndRead(fsdis, pos, writeBytes);

if (writeBytes > cellSize) {
// seek to cellSize boundary
pos = cellSize -1;
assertSeekAndRead(fsdis, pos, writeBytes);
}
void verifySeek(FileSystem fs, Path srcPath, int fileLength)
throws IOException {
FSDataInputStream in = fs.open(srcPath);
// seek to 1/2 of content
int pos = fileLength / 2;
assertSeekAndRead(in, pos, fileLength);

// seek to 1/3 of content
pos = fileLength / 3;
assertSeekAndRead(in, pos, fileLength);

// seek to 0 pos
pos = 0;
assertSeekAndRead(in, pos, fileLength);

if (fileLength > cellSize) {
// seek to cellSize boundary
pos = cellSize - 1;
assertSeekAndRead(in, pos, fileLength);
}

if (writeBytes > cellSize * dataBlocks) {
// seek to striped cell group boundary
pos = cellSize * dataBlocks - 1;
assertSeekAndRead(fsdis, pos, writeBytes);
}
if (fileLength > cellSize * dataBlocks) {
// seek to striped cell group boundary
pos = cellSize * dataBlocks - 1;
assertSeekAndRead(in, pos, fileLength);
}

if (writeBytes > blockSize * dataBlocks) {
// seek to striped block group boundary
pos = blockSize * dataBlocks - 1;
assertSeekAndRead(fsdis, pos, writeBytes);
}
if (fileLength > blockSize * dataBlocks) {
// seek to striped block group boundary
pos = blockSize * dataBlocks - 1;
assertSeekAndRead(in, pos, fileLength);
}

if(!(in.getWrappedStream() instanceof ByteRangeInputStream)){
try {
fsdis.seek(-1);
in.seek(-1);
Assert.fail("Should be failed if seek to negative offset");
} catch (EOFException e) {
// expected
}

try {
fsdis.seek(writeBytes + 1);
in.seek(fileLength + 1);
Assert.fail("Should be failed if seek after EOF");
} catch (EOFException e) {
// expected
}
}

// stateful read with ByteBuffer
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
ByteBuffer buf = ByteBuffer.allocate(writeBytes + 100);
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf);
if (ret > 0) {
readLen += ret;
}
} while (ret >= 0);
readLen = readLen >= 0 ? readLen : 0;
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
for (int i = 0; i < writeBytes; i++) {
Assert.assertEquals("Byte at " + i + " should be the same", getByte(i),
buf.array()[i]);
}
}

// stateful read with 1KB size byte array
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final byte[] result = new byte[writeBytes];
final byte[] buf = new byte[1024];
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf, 0, buf.length);
if (ret > 0) {
System.arraycopy(buf, 0, result, readLen, ret);
readLen += ret;
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result);
}

// stateful read using ByteBuffer with 1KB size
try (FSDataInputStream fsdis = fs.open(new Path(src))) {
final ByteBuffer result = ByteBuffer.allocate(writeBytes);
final ByteBuffer buf = ByteBuffer.allocate(1024);
int readLen = 0;
int ret;
do {
ret = fsdis.read(buf);
if (ret > 0) {
readLen += ret;
buf.flip();
result.put(buf);
buf.clear();
}
} while (ret >= 0);
Assert.assertEquals("The length of file should be the same to write size",
writeBytes, readLen);
Assert.assertArrayEquals(bytes, result.array());
}
in.close();
}

@Test
Expand Down

0 comments on commit a919726

Please sign in to comment.