From 95b499a3671daae9018ae005c9384fb65aa37320 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Fri, 24 Jul 2015 13:52:50 -0700 Subject: [PATCH] HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream re-fetch token when expired. Contributed by Walter Su. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/DFSStripedInputStream.java | 93 +++++------ .../hadoop/hdfs/StripedDataStreamer.java | 5 +- .../hadoop/hdfs/util/StripedBlockUtil.java | 4 +- .../hdfs/TestDFSStripedInputStream.java | 1 - ...TestDFSStripedOutputStreamWithFailure.java | 154 +++++++++--------- .../hdfs/server/balancer/TestBalancer.java | 9 + .../TestBlockTokenWithDFSStriped.java | 44 +++-- 8 files changed, 159 insertions(+), 154 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 9741585d74da0..2f7a88a2706ee 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -373,3 +373,6 @@ HDFS-8813. Erasure Coding: Client no need to decode missing parity blocks. (Walter Su via jing9) + + HDFS-8798. Erasure Coding: fix DFSStripedInputStream/DFSStripedOutputStream + re-fetch token when expired. (Walter Su via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 4f3a8eda1da2e..1f64d4ed4af17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ReadOption; -import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -44,7 +43,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.InterruptedIOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.EnumSet; @@ -206,44 +204,6 @@ private synchronized void blockSeekTo(long target) throws IOException { currentLocatedBlock = targetBlockGroup; } - /** - * @throws IOException only when failing to refetch block token, which happens - * when this client cannot get located block information from NameNode. This - * method returns null instead of throwing exception when failing to connect - * to the DataNode. - */ - private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock, - long offsetInBlock, long length, InetSocketAddress targetAddr, - StorageType storageType, DatanodeInfo datanode, long offsetInFile, - ReaderRetryPolicy retry) throws IOException { - // only need to get a new access token or a new encryption key once - while (true) { - try { - return getBlockReader(targetBlock, offsetInBlock, length, targetAddr, - storageType, datanode); - } catch (IOException e) { - if (e instanceof InvalidEncryptionKeyException && - retry.shouldRefetchEncryptionKey()) { - DFSClient.LOG.info("Will fetch a new encryption key and retry, " - + "encryption key was invalid when connecting to " + targetAddr - + " : " + e); - dfsClient.clearDataEncryptionKey(); - retry.refetchEncryptionKey(); - } else if (retry.shouldRefetchToken() && - tokenRefetchNeeded(e, targetAddr)) { - fetchBlockAt(offsetInFile); - retry.refetchToken(); - } else { - DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" - + ", add to deadNodes and continue.", e); - // Put chosen node into dead list, continue - addToDeadNodes(datanode); - return null; - } - } - } - } - /** * Extend the super method with the logic of switching between cells. * When reaching the end of a cell, proceed to the next cell and read it @@ -293,13 +253,13 @@ private void readOneStripe( final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen); final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize() - (stripeIndex * stripeLen), stripeLen); - curStripeRange = new StripeRange(offsetInBlockGroup, + StripeRange stripeRange = new StripeRange(offsetInBlockGroup, stripeLimit - stripeBufOffset); LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock; AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize, blockGroup, offsetInBlockGroup, - offsetInBlockGroup + curStripeRange.length - 1, curStripeBuf); + offsetInBlockGroup + stripeRange.length - 1, curStripeBuf); final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup( blockGroup, cellSize, dataBlkNum, parityBlkNum); // read the whole stripe @@ -311,6 +271,7 @@ private void readOneStripe( } curStripeBuf.position(stripeBufOffset); curStripeBuf.limit(stripeLimit); + curStripeRange = stripeRange; } private Callable readCells(final BlockReader reader, @@ -423,7 +384,6 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy, } Map> corruptedBlockMap = new ConcurrentHashMap<>(); - failures = 0; if (pos < getFileLength()) { try { if (pos > blockEnd) { @@ -623,13 +583,46 @@ void readParityChunks(int num) throws IOException { boolean createBlockReader(LocatedBlock block, int chunkIndex) throws IOException { - DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null); - if (dnInfo != null) { - BlockReader reader = getBlockReaderWithRetry(block, - alignedStripe.getOffsetInBlock(), - block.getBlockSize() - alignedStripe.getOffsetInBlock(), - dnInfo.addr, dnInfo.storageType, dnInfo.info, - block.getStartOffset(), new ReaderRetryPolicy()); + BlockReader reader = null; + final ReaderRetryPolicy retry = new ReaderRetryPolicy(); + DNAddrPair dnInfo = new DNAddrPair(null, null, null); + + while(true) { + try { + // the cached block location might have been re-fetched, so always + // get it from cache. + block = refreshLocatedBlock(block); + targetBlocks[chunkIndex] = block; + + // internal block has one location, just rule out the deadNodes + dnInfo = getBestNodeDNAddrPair(block, null); + if (dnInfo == null) { + break; + } + reader = getBlockReader(block, alignedStripe.getOffsetInBlock(), + block.getBlockSize() - alignedStripe.getOffsetInBlock(), + dnInfo.addr, dnInfo.storageType, dnInfo.info); + } catch (IOException e) { + if (e instanceof InvalidEncryptionKeyException && + retry.shouldRefetchEncryptionKey()) { + DFSClient.LOG.info("Will fetch a new encryption key and retry, " + + "encryption key was invalid when connecting to " + dnInfo.addr + + " : " + e); + dfsClient.clearDataEncryptionKey(); + retry.refetchEncryptionKey(); + } else if (retry.shouldRefetchToken() && + tokenRefetchNeeded(e, dnInfo.addr)) { + fetchBlockAt(block.getStartOffset()); + retry.refetchToken(); + } else { + //TODO: handles connection issues + DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " + + "block" + block.getBlock(), e); + // re-fetch the block in case the block has been moved + fetchBlockAt(block.getStartOffset()); + addToDeadNodes(dnInfo.info); + } + } if (reader != null) { readerInfos[chunkIndex] = new BlockReaderInfo(reader, block, dnInfo.info, alignedStripe.getOffsetInBlock()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index a1777962756a4..2d51dc43588ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -195,12 +195,15 @@ void populate() throws IOException { final ExtendedBlock bg = coordinator.getBlockGroup(); final LocatedBlock updated = callUpdateBlockForPipeline(bg); final long newGS = updated.getBlock().getGenerationStamp(); + final LocatedBlock[] updatedBlks = StripedBlockUtil + .parseStripedBlockGroup((LocatedStripedBlock) updated, + BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); if (bi != null) { final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), null, null, null, -1, updated.isCorrupt(), null); - lb.setBlockToken(updated.getBlockToken()); + lb.setBlockToken(updatedBlks[i].getBlockToken()); newBlocks.offer(i, lb); } else { final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index a3ee1e8219c1f..4dc94a0d7ace4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -119,8 +119,8 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, bg.getStartOffset(), bg.isCorrupt(), null); } Token[] blockTokens = bg.getBlockTokens(); - if (idxInBlockGroup < blockTokens.length) { - locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]); + if (idxInReturnedLocs < blockTokens.length) { + locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]); } return locatedBlock; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index c520d2c770d80..baf610654ebc9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ECSchema; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; import org.junit.After; import org.junit.Assert; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 8944cde02de79..54fcdf8c0991b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -25,22 +25,27 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import com.google.common.base.Preconditions; @@ -63,17 +68,13 @@ public class TestDFSStripedOutputStreamWithFailure { private static final int FLUSH_POS = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; - private final HdfsConfiguration conf = new HdfsConfiguration(); private MiniDFSCluster cluster; private DistributedFileSystem dfs; private final Path dir = new Path("/" + TestDFSStripedOutputStreamWithFailure.class.getSimpleName()); - - @Before - public void setup() throws IOException { + private void setup(Configuration conf) throws IOException { final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.waitActive(); dfs = cluster.getFileSystem(); @@ -81,8 +82,7 @@ public void setup() throws IOException { dfs.createErasureCodingZone(dir, null, 0); } - @After - public void tearDown() { + private void tearDown() { if (cluster != null) { cluster.shutdown(); } @@ -92,89 +92,76 @@ private static byte getByte(long pos) { return (byte)pos; } - @Test(timeout=120000) - public void testDatanodeFailure0() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 0; - runTest("file" + dn, length, dn); - } - - @Test(timeout=120000) - public void testDatanodeFailure1() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 1; - runTest("file" + dn, length, dn); - } - - @Test(timeout=120000) - public void testDatanodeFailure2() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 2; - runTest("file" + dn, length, dn); - } - - @Test(timeout=120000) - public void testDatanodeFailure3() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 3; - runTest("file" + dn, length, dn); - } - - @Test(timeout=120000) - public void testDatanodeFailure4() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 4; - runTest("file" + dn, length, dn); - } - - @Test(timeout=120000) - public void testDatanodeFailure5() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 5; - runTest("file" + dn, length, dn); - } - - @Test(timeout=120000) - public void testDatanodeFailure6() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 6; - runTest("file" + dn, length, dn); + private void initConf(Configuration conf){ + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); } - @Test(timeout=120000) - public void testDatanodeFailure7() { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 7; - runTest("file" + dn, length, dn); + private void initConfWithBlockToken(Configuration conf) { + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.setInt("ipc.client.connect.max.retries", 0); + // Set short retry timeouts so this test runs faster + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); } - @Test(timeout=120000) - public void testDatanodeFailure8() { + @Test(timeout=240000) + public void testDatanodeFailure() throws Exception { final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - final int dn = 8; - runTest("file" + dn, length, dn); + HdfsConfiguration conf = new HdfsConfiguration(); + initConf(conf); + for (int dn = 0; dn < 9; dn++) { + try { + setup(conf); + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + runTest(new Path(dir, "file" + dn), length, length / 2, dn, false); + } catch (Exception e) { + LOG.error("failed, dn=" + dn + ", length=" + length); + throw e; + } finally { + tearDown(); + } + } } - private void runTest(final String src, final int length, final int dnIndex) { - try { - cluster.startDataNodes(conf, 1, true, null, null); - cluster.waitActive(); - - runTest(new Path(dir, src), length, length/2, dnIndex); - } catch(Exception e) { - LOG.info("FAILED", e); - Assert.fail(StringUtils.stringifyException(e)); + @Test(timeout=240000) + public void testBlockTokenExpired() throws Exception { + final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE); + HdfsConfiguration conf = new HdfsConfiguration(); + initConf(conf); + initConfWithBlockToken(conf); + for (int dn = 0; dn < 9; dn += 2) { + try { + setup(conf); + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + runTest(new Path(dir, "file" + dn), length, length / 2, dn, true); + } catch (Exception e) { + LOG.error("failed, dn=" + dn + ", length=" + length); + throw e; + } finally { + tearDown(); + } } } private void runTest(final Path p, final int length, final int killPos, - final int dnIndex) throws Exception { + final int dnIndex, final boolean tokenExpire) throws Exception { LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos + ", dnIndex=" + dnIndex); Preconditions.checkArgument(killPos < length); Preconditions.checkArgument(killPos > FLUSH_POS); final String fullPath = p.toString(); + final NameNode nn = cluster.getNameNode(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); + + if (tokenExpire) { + // set a short token lifetime (1 second) + SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); + } + final AtomicInteger pos = new AtomicInteger(); final FSDataOutputStream out = dfs.create(p); final DFSStripedOutputStream stripedOut @@ -189,6 +176,11 @@ private void runTest(final Path p, final int length, final int killPos, Assert.assertTrue(oldGS != -1); Assert.assertEquals(oldGS, gs); + if (tokenExpire) { + DFSTestUtil.flushInternal(stripedOut); + waitTokenExpires(out); + } + killDatanode(cluster, stripedOut, dnIndex, pos); killed = true; } @@ -348,4 +340,14 @@ static void checkData(DistributedFileSystem dfs, String src, int length, killedDnIndex - dataBlockBytes.length); } } + + private void waitTokenExpires(FSDataOutputStream out) throws IOException { + Token token = DFSTestUtil.getBlockToken(out); + while (!SecurityTestUtil.isBlockTokenExpired(token)) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 759eb45771c3f..8239e5f1d6d11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1469,10 +1469,19 @@ public void testManyBalancerSimultaneously() throws Exception { } } + public void integrationTestWithStripedFile(Configuration conf) throws Exception { + initConfWithStripe(conf); + doTestBalancerWithStripedFile(conf); + } + @Test(timeout = 100000) public void testBalancerWithStripedFile() throws Exception { Configuration conf = new Configuration(); initConfWithStripe(conf); + doTestBalancerWithStripedFile(conf); + } + + private void doTestBalancerWithStripedFile(Configuration conf) throws Exception { int numOfDatanodes = dataBlocks + parityBlocks + 2; int numOfRacks = dataBlocks; long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java index e212917b6f7aa..f985f54b57407 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java @@ -20,13 +20,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -46,22 +44,6 @@ public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS { FILE_SIZE = BLOCK_SIZE * dataBlocks * 3; } - @Before - public void setup() throws IOException { - conf = getConf(); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); - cluster.getFileSystem().getClient() - .createErasureCodingZone("/", null, cellSize); - cluster.waitActive(); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } - private Configuration getConf() { Configuration conf = super.getConf(numDNs); conf.setInt("io.bytes.per.checksum", cellSize); @@ -71,14 +53,26 @@ private Configuration getConf() { @Test @Override public void testRead() throws Exception { - //TODO: DFSStripedInputStream handles token expiration -// doTestRead(conf, cluster, true); + conf = getConf(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient() + .createErasureCodingZone("/", null, cellSize); + try { + cluster.waitActive(); + doTestRead(conf, cluster, true); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } } + /** + * tested at {@link org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure#testBlockTokenExpired()} + */ @Test @Override - public void testWrite() throws Exception { - //TODO: DFSStripedOutputStream handles token expiration + public void testWrite(){ } @Test @@ -90,7 +84,9 @@ public void testAppend() throws Exception { @Test @Override public void testEnd2End() throws Exception { - //TODO: DFSStripedOutputStream handles token expiration + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + new TestBalancer().integrationTestWithStripedFile(conf); } @Override