Skip to content

Commit

Permalink
HDFS-10745. Directly resolve paths into INodesInPath. Contributed by …
Browse files Browse the repository at this point in the history
…Daryn Sharp.
  • Loading branch information
kihwal committed Aug 17, 2016
1 parent e3037c5 commit 8693936
Show file tree
Hide file tree
Showing 17 changed files with 230 additions and 226 deletions.
Expand Up @@ -25,7 +25,6 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;

import java.io.IOException;
Expand All @@ -39,11 +38,11 @@ static HdfsFileStatus modifyAclEntries(
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
src = fsd.resolvePath(pc, src);
INodesInPath iip;
fsd.writeLock();
try {
iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src), true);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
fsd.checkOwner(pc, iip);
INode inode = FSDirectory.resolveLastINode(iip);
int snapshotId = iip.getLatestSnapshotId();
Expand All @@ -64,11 +63,11 @@ static HdfsFileStatus removeAclEntries(
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
src = fsd.resolvePath(pc, src);
INodesInPath iip;
fsd.writeLock();
try {
iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src), true);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
fsd.checkOwner(pc, iip);
INode inode = FSDirectory.resolveLastINode(iip);
int snapshotId = iip.getLatestSnapshotId();
Expand All @@ -88,11 +87,11 @@ static HdfsFileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg)
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
src = fsd.resolvePath(pc, src);
INodesInPath iip;
fsd.writeLock();
try {
iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath(src), true);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
fsd.checkOwner(pc, iip);
INode inode = FSDirectory.resolveLastINode(iip);
int snapshotId = iip.getLatestSnapshotId();
Expand All @@ -112,11 +111,11 @@ static HdfsFileStatus removeAcl(FSDirectory fsd, final String srcArg)
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
src = fsd.resolvePath(pc, src);
INodesInPath iip;
fsd.writeLock();
try {
iip = fsd.getINodesInPath4Write(src);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
fsd.checkOwner(pc, iip);
unprotectedRemoveAcl(fsd, iip);
} finally {
Expand All @@ -132,11 +131,11 @@ static HdfsFileStatus setAcl(
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
src = fsd.resolvePath(pc, src);
INodesInPath iip;
fsd.writeLock();
try {
iip = fsd.getINodesInPath4Write(src);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
fsd.checkOwner(pc, iip);
List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec, false);
fsd.getEditLog().logSetAcl(src, newAcl);
Expand All @@ -150,17 +149,15 @@ static AclStatus getAclStatus(
FSDirectory fsd, String src) throws IOException {
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
src = fsd.resolvePath(pc, src);
String srcs = FSDirectory.normalizePath(src);
fsd.readLock();
try {
INodesInPath iip = fsd.resolvePath(pc, src);
src = iip.getPath();
// There is no real inode for the path ending in ".snapshot", so return a
// non-null, unpopulated AclStatus. This is similar to getFileInfo.
if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
fsd.getINode4DotSnapshot(srcs) != null) {
if (iip.isDotSnapshotDir() && fsd.getINode4DotSnapshot(iip) != null) {
return new AclStatus.Builder().owner("").group("").build();
}
INodesInPath iip = fsd.getINodesInPath(srcs, true);
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, iip);
}
Expand Down
Expand Up @@ -87,8 +87,8 @@ static LastBlockWithStatus appendFile(final FSNamesystem fsn,
final String src;
fsd.writeLock();
try {
src = fsd.resolvePath(pc, srcArg);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
final INodesInPath iip = fsd.resolvePathForWrite(pc, srcArg);
src = iip.getPath();
// Verify that the destination does not exist as a directory already
final INode inode = iip.getLastINode();
final String path = iip.getPath();
Expand Down
Expand Up @@ -60,8 +60,8 @@ static HdfsFileStatus setPermission(
INodesInPath iip;
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src);
iip = fsd.getINodesInPath4Write(src);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
fsd.checkOwner(pc, iip);
unprotectedSetPermission(fsd, src, permission);
} finally {
Expand All @@ -81,8 +81,8 @@ static HdfsFileStatus setOwner(
INodesInPath iip;
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src);
iip = fsd.getINodesInPath4Write(src);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
fsd.checkOwner(pc, iip);
if (!pc.isSuperUser()) {
if (username != null && !pc.getUser().equals(username)) {
Expand All @@ -108,8 +108,8 @@ static HdfsFileStatus setTimes(
INodesInPath iip;
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src);
iip = fsd.getINodesInPath4Write(src);
iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
// Write access is required to set access and modification times
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
Expand Down Expand Up @@ -138,8 +138,8 @@ static boolean setReplication(
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src);
final INodesInPath iip = fsd.getINodesInPath4Write(src);
final INodesInPath iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
Expand Down Expand Up @@ -211,8 +211,7 @@ static BlockStoragePolicy getStoragePolicy(FSDirectory fsd, BlockManager bm,
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.readLock();
try {
path = fsd.resolvePath(pc, path);
final INodesInPath iip = fsd.getINodesInPath(path, false);
final INodesInPath iip = fsd.resolvePath(pc, path, false);
if (fsd.isPermissionEnabled()) {
fsd.checkPathAccess(pc, iip, FsAction.READ);
}
Expand All @@ -232,8 +231,8 @@ static long getPreferredBlockSize(FSDirectory fsd, String src)
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.readLock();
try {
src = fsd.resolvePath(pc, src);
final INodesInPath iip = fsd.getINodesInPath(src, false);
final INodesInPath iip = fsd.resolvePath(pc, src, false);
src = iip.getPath();
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, iip);
}
Expand Down
Expand Up @@ -98,8 +98,8 @@ static BlocksMapUpdateInfo delete(
FSDirectory fsd = fsn.getFSDirectory();
FSPermissionChecker pc = fsd.getPermissionChecker();

src = fsd.resolvePath(pc, src);
final INodesInPath iip = fsd.getINodesInPath4Write(src, false);
final INodesInPath iip = fsd.resolvePathForWrite(pc, src, false);
src = iip.getPath();
if (!recursive && fsd.isNonEmptyDirectory(iip)) {
throw new PathIsNotEmptyDirectoryException(src + " is non empty");
}
Expand Down
Expand Up @@ -155,7 +155,8 @@ static HdfsFileStatus createEncryptionZone(final FSDirectory fsd,

fsd.writeLock();
try {
src = fsd.resolvePath(pc, srcArg);
final INodesInPath iip = fsd.resolvePath(pc, srcArg);
src = iip.getPath();
final XAttr ezXAttr = fsd.ezManager.createEncryptionZone(src, suite,
version, keyName);
xAttrs.add(ezXAttr);
Expand All @@ -178,13 +179,11 @@ static HdfsFileStatus createEncryptionZone(final FSDirectory fsd,
static Map.Entry<EncryptionZone, HdfsFileStatus> getEZForPath(
final FSDirectory fsd, final String srcArg, final FSPermissionChecker pc)
throws IOException {
final String src;
final INodesInPath iip;
final EncryptionZone ret;
fsd.readLock();
try {
src = fsd.resolvePath(pc, srcArg);
iip = fsd.getINodesInPath(src, true);
iip = fsd.resolvePath(pc, srcArg);
if (iip.getLastINode() == null) {
throw new FileNotFoundException("Path not found: " + iip.getPath());
}
Expand Down
Expand Up @@ -73,12 +73,12 @@ static HdfsFileStatus setErasureCodingPolicy(final FSNamesystem fsn,
FSPermissionChecker pc = null;
pc = fsn.getPermissionChecker();
FSDirectory fsd = fsn.getFSDirectory();
src = fsd.resolvePath(pc, src);
final INodesInPath iip;
List<XAttr> xAttrs;
fsd.writeLock();
try {
iip = fsd.getINodesInPath4Write(src, false);
iip = fsd.resolvePathForWrite(pc, src, false);
src = iip.getPath();
xAttrs = createErasureCodingPolicyXAttr(fsn, iip, ecPolicy);
} finally {
fsd.writeUnlock();
Expand Down Expand Up @@ -221,11 +221,9 @@ static ErasureCodingPolicy[] getErasureCodingPolicies(final FSNamesystem fsn)

private static INodesInPath getINodesInPath(final FSNamesystem fsn,
final String srcArg) throws IOException {
String src = srcArg;
final FSDirectory fsd = fsn.getFSDirectory();
final FSPermissionChecker pc = fsn.getPermissionChecker();
src = fsd.resolvePath(pc, src);
INodesInPath iip = fsd.getINodesInPath(src, true);
INodesInPath iip = fsd.resolvePath(pc, srcArg);
if (fsn.isPermissionEnabled()) {
fsn.getFSDirectory().checkPathAccess(pc, iip, FsAction.READ);
}
Expand Down
Expand Up @@ -52,8 +52,8 @@ static HdfsFileStatus mkdirs(FSNamesystem fsn, String src,
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.writeLock();
try {
src = fsd.resolvePath(pc, src);
INodesInPath iip = fsd.getINodesInPath4Write(src);
INodesInPath iip = fsd.resolvePathForWrite(pc, src);
src = iip.getPath();
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, iip);
}
Expand Down
Expand Up @@ -65,12 +65,14 @@ static RenameOldResult renameToInt(
FSPermissionChecker pc = fsd.getPermissionChecker();

HdfsFileStatus resultingStat = null;
src = fsd.resolvePath(pc, src);
dst = fsd.resolvePath(pc, dst);
// Rename does not operate on link targets
// Do not resolveLink when checking permissions of src and dst
INodesInPath srcIIP = fsd.resolvePathForWrite(pc, src, false);
INodesInPath dstIIP = fsd.resolvePathForWrite(pc, dst, false);
@SuppressWarnings("deprecation")
final boolean status = renameTo(fsd, pc, src, dst, logRetryCache);
final boolean status = renameTo(fsd, pc, srcIIP, dstIIP, logRetryCache);
if (status) {
INodesInPath dstIIP = fsd.getINodesInPath(dst, false);
dstIIP = fsd.getINodesInPath(dstIIP.getPath(), false);
resultingStat = fsd.getAuditFileInfo(dstIIP);
}
return new RenameOldResult(status, resultingStat);
Expand Down Expand Up @@ -238,9 +240,8 @@ static Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> renameToInt(
final FSPermissionChecker pc = fsd.getPermissionChecker();

BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
src = fsd.resolvePath(pc, src);
dst = fsd.resolvePath(pc, dst);
renameTo(fsd, pc, src, dst, collectedBlocks, logRetryCache, options);
// returns resolved path
dst = renameTo(fsd, pc, src, dst, collectedBlocks, logRetryCache, options);
INodesInPath dstIIP = fsd.getINodesInPath(dst, false);
HdfsFileStatus resultingStat = fsd.getAuditFileInfo(dstIIP);

Expand All @@ -252,11 +253,13 @@ static Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> renameToInt(
* @see {@link #unprotectedRenameTo(FSDirectory, String, String, INodesInPath,
* INodesInPath, long, BlocksMapUpdateInfo, Options.Rename...)}
*/
static void renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
static String renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
String dst, BlocksMapUpdateInfo collectedBlocks, boolean logRetryCache,
Options.Rename... options) throws IOException {
final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
final INodesInPath dstIIP = fsd.getINodesInPath4Write(dst, false);
final INodesInPath srcIIP = fsd.resolvePathForWrite(pc, src, false);
final INodesInPath dstIIP = fsd.resolvePathForWrite(pc, dst, false);
src = srcIIP.getPath();
dst = dstIIP.getPath();
if (fsd.isPermissionEnabled()) {
// Rename does not operate on link targets
// Do not resolveLink when checking permissions of src and dst
Expand All @@ -283,6 +286,7 @@ static void renameTo(FSDirectory fsd, FSPermissionChecker pc, String src,
fsd.writeUnlock();
}
fsd.getEditLog().logRename(src, dst, mtime, logRetryCache, options);
return dst;
}

/**
Expand Down Expand Up @@ -442,16 +446,17 @@ static boolean unprotectedRenameTo(FSDirectory fsd, String src, String dst,
@Deprecated
@SuppressWarnings("deprecation")
private static boolean renameTo(FSDirectory fsd, FSPermissionChecker pc,
String src, String dst, boolean logRetryCache) throws IOException {
// Rename does not operate on link targets
// Do not resolveLink when checking permissions of src and dst
// Check write access to parent of src
final INodesInPath srcIIP = fsd.getINodesInPath4Write(src, false);
INodesInPath srcIIP, INodesInPath dstIIP, boolean logRetryCache)
throws IOException {
String src = srcIIP.getPath();
String dst = dstIIP.getPath();
// Note: We should not be doing this. This is move() not renameTo().
final String actualDst = fsd.isDir(dst) ?
dst + Path.SEPARATOR + new Path(src).getName() : dst;
final INodesInPath dstIIP = fsd.getINodesInPath4Write(actualDst, false);
if (fsd.isDir(dst)) {
dstIIP = INodesInPath.append(dstIIP, null, srcIIP.getLastLocalName());
}
final String actualDst = dstIIP.getPath();
if (fsd.isPermissionEnabled()) {
// Check write access to parent of src
fsd.checkPermission(pc, srcIIP, false, null, FsAction.WRITE, null, null,
false);
// Check write access to ancestor of dst
Expand Down

0 comments on commit 8693936

Please sign in to comment.