Skip to content

Commit

Permalink
HDFS-7738. Revise the exception message for recover lease; add more t…
Browse files Browse the repository at this point in the history
…runcate tests such as truncate with HA setup, negative tests, truncate with other operations and multiple truncates.
  • Loading branch information
Tsz-Wo Nicholas Sze committed Feb 7, 2015
1 parent cfb829e commit 8f7d4bb
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 76 deletions.
4 changes: 4 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -584,6 +584,10 @@ Release 2.7.0 - UNRELEASED


HDFS-7710. Remove dead code in BackupImage.java. (Xiaoyu Yao via aajisaka) HDFS-7710. Remove dead code in BackupImage.java. (Xiaoyu Yao via aajisaka)


HDFS-7738. Revise the exception message for recover lease; add more truncate
tests such as truncate with HA setup, negative tests, truncate with other
operations and multiple truncates. (szetszwo)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
Expand Down
Expand Up @@ -227,7 +227,6 @@
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer; import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean; import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
Expand Down Expand Up @@ -1966,8 +1965,9 @@ boolean truncateInternal(String src, long newLength,
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Cannot truncate lazy persist file " + src); "Cannot truncate lazy persist file " + src);
} }
// Opening an existing file for write. May need lease recovery. // Opening an existing file for truncate. May need lease recovery.
recoverLeaseInternal(iip, src, clientName, clientMachine, false); recoverLeaseInternal(RecoverLeaseOp.TRUNCATE_FILE,
iip, src, clientName, clientMachine, false);
// Truncate length check. // Truncate length check.
long oldLength = file.computeFileSize(); long oldLength = file.computeFileSize();
if(oldLength == newLength) { if(oldLength == newLength) {
Expand Down Expand Up @@ -2498,7 +2498,8 @@ private BlocksMapUpdateInfo startFileInternal(FSPermissionChecker pc,
} }
} else { } else {
// If lease soft limit time is expired, recover the lease // If lease soft limit time is expired, recover the lease
recoverLeaseInternal(iip, src, holder, clientMachine, false); recoverLeaseInternal(RecoverLeaseOp.CREATE_FILE,
iip, src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " + throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists"); clientMachine + " already exists");
} }
Expand Down Expand Up @@ -2620,8 +2621,9 @@ private LocatedBlock appendFileInternal(FSPermissionChecker pc,
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"Cannot append to lazy persist file " + src); "Cannot append to lazy persist file " + src);
} }
// Opening an existing file for write - may need to recover lease. // Opening an existing file for append - may need to recover lease.
recoverLeaseInternal(iip, src, holder, clientMachine, false); recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE,
iip, src, holder, clientMachine, false);


final BlockInfo lastBlock = myFile.getLastBlock(); final BlockInfo lastBlock = myFile.getLastBlock();
// Check that the block has at least minimum replication. // Check that the block has at least minimum replication.
Expand Down Expand Up @@ -2720,7 +2722,8 @@ boolean recoverLease(String src, String holder, String clientMachine)
dir.checkPathAccess(pc, iip, FsAction.WRITE); dir.checkPathAccess(pc, iip, FsAction.WRITE);
} }


recoverLeaseInternal(iip, src, holder, clientMachine, true); recoverLeaseInternal(RecoverLeaseOp.RECOVER_LEASE,
iip, src, holder, clientMachine, true);
} catch (StandbyException se) { } catch (StandbyException se) {
skipSync = true; skipSync = true;
throw se; throw se;
Expand All @@ -2735,7 +2738,20 @@ boolean recoverLease(String src, String holder, String clientMachine)
return false; return false;
} }


void recoverLeaseInternal(INodesInPath iip, private enum RecoverLeaseOp {
CREATE_FILE,
APPEND_FILE,
TRUNCATE_FILE,
RECOVER_LEASE;

private String getExceptionMessage(String src, String holder,
String clientMachine, String reason) {
return "Failed to " + this + " " + src + " for " + holder +
" on " + clientMachine + " because " + reason;
}
}

void recoverLeaseInternal(RecoverLeaseOp op, INodesInPath iip,
String src, String holder, String clientMachine, boolean force) String src, String holder, String clientMachine, boolean force)
throws IOException { throws IOException {
assert hasWriteLock(); assert hasWriteLock();
Expand All @@ -2746,18 +2762,15 @@ void recoverLeaseInternal(INodesInPath iip,
// leases. Find the appropriate lease record. // leases. Find the appropriate lease record.
// //
Lease lease = leaseManager.getLease(holder); Lease lease = leaseManager.getLease(holder);
//
// We found the lease for this file. And surprisingly the original
// holder is trying to recreate this file. This should never occur.
//


if (!force && lease != null) { if (!force && lease != null) {
Lease leaseFile = leaseManager.getLeaseByPath(src); Lease leaseFile = leaseManager.getLeaseByPath(src);
if (leaseFile != null && leaseFile.equals(lease)) { if (leaseFile != null && leaseFile.equals(lease)) {
// We found the lease for this file but the original
// holder is trying to obtain it again.
throw new AlreadyBeingCreatedException( throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder + op.getExceptionMessage(src, holder, clientMachine,
" for client " + clientMachine + holder + " is already the current lease holder."));
" because current leaseholder is trying to recreate file.");
} }
} }
// //
Expand All @@ -2768,9 +2781,8 @@ void recoverLeaseInternal(INodesInPath iip,
lease = leaseManager.getLease(clientName); lease = leaseManager.getLease(clientName);
if (lease == null) { if (lease == null) {
throw new AlreadyBeingCreatedException( throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder + op.getExceptionMessage(src, holder, clientMachine,
" for client " + clientMachine + "the file is under construction but no leases found."));
" because pendingCreates is non-null but no leases found.");
} }
if (force) { if (force) {
// close now: no need to wait for soft lease expiration and // close now: no need to wait for soft lease expiration and
Expand All @@ -2792,20 +2804,21 @@ void recoverLeaseInternal(INodesInPath iip,
boolean isClosed = internalReleaseLease(lease, src, iip, null); boolean isClosed = internalReleaseLease(lease, src, iip, null);
if(!isClosed) if(!isClosed)
throw new RecoveryInProgressException( throw new RecoveryInProgressException(
"Failed to close file " + src + op.getExceptionMessage(src, holder, clientMachine,
". Lease recovery is in progress. Try again later."); "lease recovery is in progress. Try again later."));
} else { } else {
final BlockInfo lastBlock = file.getLastBlock(); final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null if (lastBlock != null
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) { && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException("Recovery in progress, file [" throw new RecoveryInProgressException(
+ src + "], " + "lease owner [" + lease.getHolder() + "]"); op.getExceptionMessage(src, holder, clientMachine,
"another recovery is in progress by "
+ clientName + " on " + uc.getClientMachine()));
} else { } else {
throw new AlreadyBeingCreatedException("Failed to create file [" throw new AlreadyBeingCreatedException(
+ src + "] for [" + holder + "] for client [" + clientMachine op.getExceptionMessage(src, holder, clientMachine,
+ "], because this file is already being created by [" "this file lease is currently owned by "
+ clientName + "] on [" + clientName + " on " + uc.getClientMachine()));
+ uc.getClientMachine() + "]");
} }
} }
} }
Expand Down
Expand Up @@ -18,9 +18,11 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;


import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
import java.util.Random; import java.util.Random;


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -80,6 +82,27 @@ public static byte[] randomBytes(long seed, int size) {
return b; return b;
} }


/** @return a random file partition of length n. */
public static int[] randomFilePartition(int n, int parts) {
int[] p = new int[parts];
for(int i = 0; i < p.length; i++) {
p[i] = nextInt(n - i - 1) + 1;
}
Arrays.sort(p);
for(int i = 1; i < p.length; i++) {
if (p[i] <= p[i - 1]) {
p[i] = p[i - 1] + 1;
}
}

LOG.info("partition=" + Arrays.toString(p));
assertTrue("i=0", p[0] > 0 && p[0] < n);
for(int i = 1; i < p.length; i++) {
assertTrue("i=" + i, p[i] > p[i - 1] && p[i] < n);
}
return p;
}

static void sleep(long ms) { static void sleep(long ms) {
try { try {
Thread.sleep(ms); Thread.sleep(ms);
Expand Down Expand Up @@ -173,13 +196,30 @@ public static FSDataOutputStream createFile(FileSystem fileSys, Path name, int r
(short) repl, BLOCK_SIZE); (short) repl, BLOCK_SIZE);
} }


public static void checkFullFile(FileSystem fs, Path file, int len,
final byte[] compareContent) throws IOException {
checkFullFile(fs, file, len, compareContent, file.toString());
}

/** /**
* Compare the content of a file created from FileSystem and Path with * Compare the content of a file created from FileSystem and Path with
* the specified byte[] buffer's content * the specified byte[] buffer's content
* @throws IOException an exception might be thrown * @throws IOException an exception might be thrown
*/ */
public static void checkFullFile(FileSystem fs, Path name, int len, public static void checkFullFile(FileSystem fs, Path name, int len,
final byte[] compareContent, String message) throws IOException { final byte[] compareContent, String message) throws IOException {
checkFullFile(fs, name, len, compareContent, message, true);
}

public static void checkFullFile(FileSystem fs, Path name, int len,
final byte[] compareContent, String message,
boolean checkFileStatus) throws IOException {
if (checkFileStatus) {
final FileStatus status = fs.getFileStatus(name);
assertEquals("len=" + len + " but status.getLen()=" + status.getLen(),
len, status.getLen());
}

FSDataInputStream stm = fs.open(name); FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[len]; byte[] actual = new byte[len];
stm.readFully(0, actual); stm.readFully(0, actual);
Expand Down
Expand Up @@ -23,8 +23,8 @@
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;

import com.google.common.collect.Maps; import com.google.common.collect.Maps;

import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -819,14 +819,17 @@ public List<String> getGroups(String user) throws IOException {
* Get a FileSystem instance as specified user in a doAs block. * Get a FileSystem instance as specified user in a doAs block.
*/ */
static public FileSystem getFileSystemAs(UserGroupInformation ugi, static public FileSystem getFileSystemAs(UserGroupInformation ugi,
final Configuration conf) throws IOException, final Configuration conf) throws IOException {
InterruptedException { try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override @Override
public FileSystem run() throws Exception { public FileSystem run() throws Exception {
return FileSystem.get(conf); return FileSystem.get(conf);
} }
}); });
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
} }


public static byte[] generateSequentialBytes(int start, int length) { public static byte[] generateSequentialBytes(int start, int length) {
Expand Down
Expand Up @@ -103,9 +103,10 @@ private void checkFile(FileSystem fileSys, Path name, int repl)
System.arraycopy(fileContents, 0, expected, 0, expected.length); System.arraycopy(fileContents, 0, expected, 0, expected.length);
} }
// do a sanity check. Read the file // do a sanity check. Read the file
// do not check file status since the file is not yet closed.
AppendTestUtil.checkFullFile(fileSys, name, AppendTestUtil.checkFullFile(fileSys, name,
AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE, AppendTestUtil.NUM_BLOCKS * AppendTestUtil.BLOCK_SIZE,
expected, "Read 1"); expected, "Read 1", false);
} }


/** /**
Expand Down
Expand Up @@ -64,7 +64,6 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
Expand All @@ -85,7 +84,6 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -405,8 +403,7 @@ public FileSystem run() throws Exception {
fs2.create(p, false); fs2.create(p, false);
fail("Did not throw!"); fail("Did not throw!");
} catch (IOException abce) { } catch (IOException abce) {
GenericTestUtils.assertExceptionContains("already being created by", GenericTestUtils.assertExceptionContains("Failed to CREATE_FILE", abce);
abce);
} }
// NameNodeProxies' createNNProxyWithClientProtocol has 5 retries. // NameNodeProxies' createNNProxyWithClientProtocol has 5 retries.
assertCounter("AlreadyBeingCreatedExceptionNumOps", assertCounter("AlreadyBeingCreatedExceptionNumOps",
Expand Down
Expand Up @@ -490,8 +490,8 @@ public void testHFlushInterrupted() throws Exception {




// verify that entire file is good // verify that entire file is good
AppendTestUtil.checkFullFile(fs, p, 4, AppendTestUtil.checkFullFile(fs, p, 4, fileContents,
fileContents, "Failed to deal with thread interruptions"); "Failed to deal with thread interruptions", false);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -292,9 +293,13 @@ public void runFsFun(String msg, FSRun f) {
try { try {
f.run(fs); f.run(fs);
fail(msg); fail(msg);
} catch (IOException ioe) { } catch (RemoteException re) {
assertTrue(ioe.getMessage().contains("safe mode")); assertEquals(SafeModeException.class.getName(), re.getClassName());
} GenericTestUtils.assertExceptionContains(
"Name node is in safe mode", re);
} catch (IOException ioe) {
fail(msg + " " + StringUtils.stringifyException(ioe));
}
} }


/** /**
Expand Down Expand Up @@ -341,6 +346,12 @@ public void run(FileSystem fs) throws IOException {
DFSTestUtil.appendFile(fs, file1, "new bytes"); DFSTestUtil.appendFile(fs, file1, "new bytes");
}}); }});


runFsFun("Truncate file while in SM", new FSRun() {
@Override
public void run(FileSystem fs) throws IOException {
fs.truncate(file1, 0);
}});

runFsFun("Delete file while in SM", new FSRun() { runFsFun("Delete file while in SM", new FSRun() {
@Override @Override
public void run(FileSystem fs) throws IOException { public void run(FileSystem fs) throws IOException {
Expand Down

0 comments on commit 8f7d4bb

Please sign in to comment.