Permalink
Browse files

HDFS-927 DFSInputStream retries too many times for new block locations

  • Loading branch information...
saintstack committed Feb 9, 2010
1 parent 2e5098c commit a469ff2eff573cb109c15f6a71753a3370c1ebf0
@@ -29,6 +29,7 @@
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jets3t-0.6.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/junit-3.8.1.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/log4j-1.2.15.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/mockito-all-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/oro-2.0.8.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jetty-6.1.14.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/jetty-util-6.1.14.jar"/>
View
@@ -256,6 +256,10 @@
rev="${slf4j-log4j12.version}"
conf="common->master">
</dependency>
- </dependencies>
+ <dependency org="org.mockito"
+ name="mockito-all"
+ rev="${mockito-all.version}"
+ conf="common->master"/>
+</dependencies>
</ivy-module>
View
@@ -57,6 +57,8 @@ kfs.version=0.1
log4j.version=1.2.15
lucene-core.version=2.3.1
+mockito-all.version=1.8.0
+
oro.version=2.0.8
rats-lib.version=0.5.1
@@ -186,9 +186,7 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
- this.maxBlockAcquireFailures =
- conf.getInt("dfs.client.max.block.acquire.failures",
- MAX_BLOCK_ACQUIRE_FAILURES);
+ this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
try {
this.ugi = UnixUserGroupInformation.login(conf, true);
@@ -218,6 +216,11 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
}
}
+ static int getMaxBlockAcquireFailures(Configuration conf) {
+ return conf.getInt("dfs.client.max.block.acquire.failures",
+ MAX_BLOCK_ACQUIRE_FAILURES);
+ }
+
private void checkOpen() throws IOException {
if (!clientRunning) {
IOException result = new IOException("Filesystem closed");
@@ -1450,6 +1453,18 @@ private void checksumOk(Socket sock) {
private Block currentBlock = null;
private long pos = 0;
private long blockEnd = -1;
+
+ /**
+ * This variable tracks the number of failures since the start of the
+ * most recent user-facing operation. That is to say, it should be reset
+ * whenever the user makes a call on this stream, and if at any point
+ * during the retry logic, the failure count exceeds a threshold,
+ * the errors will be thrown back to the operation.
+ *
+ * Specifically this counts the number of times the client has gone
+ * back to the namenode to get a new list of block locations, and is
+ * capped at maxBlockAcquireFailures
+ */
private int failures = 0;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
@@ -1742,6 +1757,8 @@ public synchronized int read(byte buf[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
+ failures = 0;
+
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
@@ -1885,6 +1902,7 @@ public int read(long position, byte[] buffer, int offset, int length)
if (closed) {
throw new IOException("Stream closed");
}
+ failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
return -1;
@@ -23,11 +23,13 @@
import java.nio.ByteBuffer;
import java.util.Random;
import junit.framework.*;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
/**
* A JUnit test for corrupted file handling.
@@ -57,17 +59,7 @@
* replica was created from the non-corrupted replica.
*/
public class TestCrcCorruption extends TestCase {
-
- public TestCrcCorruption(String testName) {
- super(testName);
- }
-
- protected void setUp() throws Exception {
- }
- protected void tearDown() throws Exception {
- }
-
/**
* check if DFS can handle corrupted CRC blocks
*/
@@ -222,4 +214,57 @@ public void testCrcCorruption() throws Exception {
DFSTestUtil util2 = new DFSTestUtil("TestCrcCorruption", 40, 3, 400);
thistest(conf2, util2);
}
+
+ /**
+ * Make a single-DN cluster, corrupt a block, and make sure
+ * there's no infinite loop, but rather it eventually
+ * reports the exception to the client.
+ */
+ public void testEntirelyCorruptFileOneNode() throws Exception {
+ doTestEntirelyCorruptFile(1);
+ }
+
+ /**
+ * Same thing with multiple datanodes - in history, this has
+ * behaved differently than the above.
+ *
+ * This test usually completes in around 15 seconds - if it
+ * times out, this suggests that the client is retrying
+ * indefinitely.
+ */
+ public void testEntirelyCorruptFileThreeNodes() throws Exception {
+ doTestEntirelyCorruptFile(3);
+ }
+
+ private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
+ long fileSize = 4096;
+ Path file = new Path("/testFile");
+
+ Configuration conf = new Configuration();
+ conf.setInt("dfs.replication", numDataNodes);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+
+ try {
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
+ DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+
+ String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
+ cluster.corruptBlockOnDataNodes(block);
+
+ try {
+ IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
+ true);
+ fail("Didn't get exception");
+ } catch (IOException ioe) {
+ DFSClient.LOG.info("Got expected exception", ioe);
+ }
+
+ } finally {
+ cluster.shutdown();
+ }
+
+ }
}
@@ -20,22 +20,28 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient.DFSInputStream;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
import org.apache.hadoop.hdfs.server.common.*;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import junit.framework.TestCase;
-
+import static org.mockito.Mockito.*;
+import org.mockito.stubbing.Answer;
+import org.mockito.invocation.InvocationOnMock;
/**
* These tests make sure that DFSClient retries fetching data from DFS
@@ -234,5 +240,124 @@ public void testNotYetReplicatedErrors() throws IOException
e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
}
}
-
+
+ /**
+ * This tests that DFSInputStream failures are counted for a given read
+ * operation, and not over the lifetime of the stream. It is a regression
+ * test for HDFS-127.
+ */
+ public void testFailuresArePerOperation() throws Exception
+ {
+ long fileSize = 4096;
+ Path file = new Path("/testFile");
+
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+
+ int maxBlockAcquires = DFSClient.getMaxBlockAcquireFailures(conf);
+ assertTrue(maxBlockAcquires > 0);
+
+ try {
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ NameNode preSpyNN = cluster.getNameNode();
+ NameNode spyNN = spy(preSpyNN);
+ DFSClient client = new DFSClient(null, spyNN, conf, null);
+
+ DFSTestUtil.createFile(fs, file, fileSize, (short)1, 12345L /*seed*/);
+
+ // If the client will retry maxBlockAcquires times, then if we fail
+ // any more than that number of times, the operation should entirely
+ // fail.
+ doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires + 1))
+ .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+ try {
+ IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+ true);
+ fail("Didn't get exception");
+ } catch (IOException ioe) {
+ DFSClient.LOG.info("Got expected exception", ioe);
+ }
+
+ // If we fail exactly that many times, then it should succeed.
+ doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+ .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+ IOUtils.copyBytes(client.open(file.toString()), new IOUtils.NullOutputStream(), conf,
+ true);
+
+ DFSClient.LOG.info("Starting test case for failure reset");
+
+ // Now the tricky case - if we fail a few times on one read, then succeed,
+ // then fail some more on another read, it shouldn't fail.
+ doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+ .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+ DFSInputStream is = client.open(file.toString());
+ byte buf[] = new byte[10];
+ IOUtils.readFully(is, buf, 0, buf.length);
+
+ DFSClient.LOG.info("First read successful after some failures.");
+
+ // Further reads at this point will succeed since it has the good block locations.
+ // So, force the block locations on this stream to be refreshed from bad info.
+ // When reading again, it should start from a fresh failure count, since
+ // we're starting a new operation on the user level.
+ doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
+ .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
+ is.openInfo();
+ // Seek to beginning forces a reopen of the BlockReader - otherwise it'll
+ // just keep reading on the existing stream and the fact that we've poisoned
+ // the block info won't do anything.
+ is.seek(0);
+ IOUtils.readFully(is, buf, 0, buf.length);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Mock Answer implementation of NN.getBlockLocations that will return
+ * a poisoned block list a certain number of times before returning
+ * a proper one.
+ */
+ private static class FailNTimesAnswer implements Answer<LocatedBlocks> {
+ private int failuresLeft;
+ private NameNode realNN;
+
+ public FailNTimesAnswer(NameNode realNN, int timesToFail) {
+ failuresLeft = timesToFail;
+ this.realNN = realNN;
+ }
+
+ public LocatedBlocks answer(InvocationOnMock invocation) throws IOException {
+ Object args[] = invocation.getArguments();
+ LocatedBlocks realAnswer = realNN.getBlockLocations(
+ (String)args[0],
+ (Long)args[1],
+ (Long)args[2]);
+
+ if (failuresLeft-- > 0) {
+ NameNode.LOG.info("FailNTimesAnswer injecting failure.");
+ return makeBadBlockList(realAnswer);
+ }
+ NameNode.LOG.info("FailNTimesAnswer no longer failing.");
+ return realAnswer;
+ }
+
+ private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
+ LocatedBlock goodLocatedBlock = goodBlockList.get(0);
+ LocatedBlock badLocatedBlock = new LocatedBlock(
+ goodLocatedBlock.getBlock(),
+ new DatanodeInfo[] {
+ new DatanodeInfo(new DatanodeID("255.255.255.255:234"))
+ },
+ goodLocatedBlock.getStartOffset(),
+ false);
+
+
+ List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>();
+ badBlocks.add(badLocatedBlock);
+ return new LocatedBlocks(goodBlockList.getFileLength(), badBlocks, false);
+ }
+ }
}

0 comments on commit a469ff2

Please sign in to comment.