Skip to content

Commit

Permalink
HDFS-8216. TestDFSStripedOutputStream should use BlockReaderTestUtil …
Browse files Browse the repository at this point in the history
…to create BlockReader. Contributed by Tsz Wo Nicholas Sze.
  • Loading branch information
zhe-thoughts authored and Zhe Zhang committed May 26, 2015
1 parent e107886 commit e9d85bb
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 64 deletions.
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
Expand Up @@ -107,3 +107,6 @@


HDFS-8190. StripedBlockUtil.getInternalBlockLength may have overflow error. HDFS-8190. StripedBlockUtil.getInternalBlockLength may have overflow error.
(szetszwo) (szetszwo)

HDFS-8216. TestDFSStripedOutputStream should use BlockReaderTestUtil to
create BlockReader. (szetszwo via Zhe Zhang)
Expand Up @@ -165,20 +165,19 @@ public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof)
*/ */
public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
throws IOException { throws IOException {
return getBlockReader(cluster, testBlock, offset, lenToRead); return getBlockReader(cluster.getFileSystem(), testBlock, offset, lenToRead);
} }


/** /**
* Get a BlockReader for the given block. * Get a BlockReader for the given block.
*/ */
public static BlockReader getBlockReader(MiniDFSCluster cluster, public static BlockReader getBlockReader(final DistributedFileSystem fs,
LocatedBlock testBlock, int offset, int lenToRead) throws IOException { LocatedBlock testBlock, int offset, long lenToRead) throws IOException {
InetSocketAddress targetAddr = null; InetSocketAddress targetAddr = null;
ExtendedBlock block = testBlock.getBlock(); ExtendedBlock block = testBlock.getBlock();
DatanodeInfo[] nodes = testBlock.getLocations(); DatanodeInfo[] nodes = testBlock.getLocations();
targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());


final DistributedFileSystem fs = cluster.getFileSystem();
return new BlockReaderFactory(fs.getClient().getConf()). return new BlockReaderFactory(fs.getClient().getConf()).
setInetSocketAddress(targetAddr). setInetSocketAddress(targetAddr).
setBlock(block). setBlock(block).
Expand Down
Expand Up @@ -250,8 +250,8 @@ public void run() {
LocatedBlock lblock = locatedBlocks.get(0); // first block LocatedBlock lblock = locatedBlocks.get(0); // first block
BlockReader blockReader = null; BlockReader blockReader = null;
try { try {
blockReader = BlockReaderTestUtil. blockReader = BlockReaderTestUtil.getBlockReader(
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
Assert.fail("expected getBlockReader to fail the first time."); Assert.fail("expected getBlockReader to fail the first time.");
} catch (Throwable t) { } catch (Throwable t) {
Assert.assertTrue("expected to see 'TCP reads were disabled " + Assert.assertTrue("expected to see 'TCP reads were disabled " +
Expand All @@ -265,8 +265,8 @@ public void run() {


// Second time should succeed. // Second time should succeed.
try { try {
blockReader = BlockReaderTestUtil. blockReader = BlockReaderTestUtil.getBlockReader(
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("error trying to retrieve a block reader " + LOG.error("error trying to retrieve a block reader " +
"the second time.", t); "the second time.", t);
Expand Down Expand Up @@ -474,8 +474,8 @@ public void run() {
while (true) { while (true) {
BlockReader blockReader = null; BlockReader blockReader = null;
try { try {
blockReader = BlockReaderTestUtil. blockReader = BlockReaderTestUtil.getBlockReader(
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
sem.release(); sem.release();
try { try {
blockReader.readAll(buf, 0, TEST_FILE_LEN); blockReader.readAll(buf, 0, TEST_FILE_LEN);
Expand Down Expand Up @@ -514,8 +514,8 @@ public void run() {
// getting a ClosedChannelException. // getting a ClosedChannelException.
BlockReader blockReader = null; BlockReader blockReader = null;
try { try {
blockReader = BlockReaderTestUtil. blockReader = BlockReaderTestUtil.getBlockReader(
getBlockReader(cluster, lblock, 0, TEST_FILE_LEN); cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
blockReader.readFully(buf, 0, TEST_FILE_LEN); blockReader.readFully(buf, 0, TEST_FILE_LEN);
} finally { } finally {
if (blockReader != null) blockReader.close(); if (blockReader != null) blockReader.close();
Expand Down
Expand Up @@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;


import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
Expand All @@ -29,25 +27,14 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
Expand All @@ -59,7 +46,6 @@ public class TestDFSStripedOutputStream {
private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; private int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;


private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
private DistributedFileSystem fs; private DistributedFileSystem fs;
private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; private final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
private final int stripesPerBlock = 4; private final int stripesPerBlock = 4;
Expand Down Expand Up @@ -173,7 +159,11 @@ private void testOneFile(String src, int writeBytes) throws IOException {
// check file length // check file length
FileStatus status = fs.getFileStatus(testPath); FileStatus status = fs.getFileStatus(testPath);
Assert.assertEquals(writeBytes, status.getLen()); Assert.assertEquals(writeBytes, status.getLen());

checkData(src, writeBytes);
}


void checkData(String src, int writeBytes) throws IOException {
List<List<LocatedBlock>> blockGroupList = new ArrayList<>(); List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L); LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0L);


Expand All @@ -199,11 +189,7 @@ private void testOneFile(String src, int writeBytes) throws IOException {
if (lblock == null) { if (lblock == null) {
continue; continue;
} }
DatanodeInfo[] nodes = lblock.getLocations();
ExtendedBlock block = lblock.getBlock(); ExtendedBlock block = lblock.getBlock();
InetSocketAddress targetAddr = NetUtils.createSocketAddr(
nodes[0].getXferAddr());

byte[] blockBytes = new byte[(int)block.getNumBytes()]; byte[] blockBytes = new byte[(int)block.getNumBytes()];
if (i < dataBlocks) { if (i < dataBlocks) {
dataBlockBytes[i] = blockBytes; dataBlockBytes[i] = blockBytes;
Expand All @@ -215,40 +201,8 @@ private void testOneFile(String src, int writeBytes) throws IOException {
continue; continue;
} }


BlockReader blockReader = new BlockReaderFactory(new DfsClientConf(conf)). final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
setFileName(src). fs, lblock, 0, block.getNumBytes());
setBlock(block).
setBlockToken(lblock.getBlockToken()).
setInetSocketAddress(targetAddr).
setStartOffset(0).
setLength(block.getNumBytes()).
setVerifyChecksum(true).
setClientName("TestStripeLayoutWrite").
setDatanodeInfo(nodes[0]).
setCachingStrategy(CachingStrategy.newDefaultStrategy()).
setClientCacheContext(ClientContext.getFromConf(conf)).
setConfiguration(conf).
setRemotePeerFactory(new RemotePeerFactory() {
@Override
public Peer newConnectedPeer(InetSocketAddress addr,
Token<BlockTokenIdentifier> blockToken,
DatanodeID datanodeId)
throws IOException {
Peer peer = null;
Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
try {
sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
peer = TcpPeerServer.peerFromSocket(sock);
} finally {
if (peer == null) {
IOUtils.closeSocket(sock);
}
}
return peer;
}
}).build();

blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
blockReader.close(); blockReader.close();
} }
Expand Down

0 comments on commit e9d85bb

Please sign in to comment.