Skip to content

Commit

Permalink
Update AbstractTFS file system calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed Jan 6, 2016
1 parent f8d14b7 commit b791ad5
Showing 1 changed file with 52 additions and 65 deletions.
117 changes: 52 additions & 65 deletions clients/unshaded/src/main/java/tachyon/hadoop/AbstractTFS.java
Expand Up @@ -41,11 +41,11 @@
import tachyon.client.file.FileOutStream; import tachyon.client.file.FileOutStream;
import tachyon.client.file.FileSystemContext; import tachyon.client.file.FileSystemContext;
import tachyon.client.file.FileSystemMasterClient; import tachyon.client.file.FileSystemMasterClient;
import tachyon.client.file.TachyonFile;
import tachyon.client.file.FileSystem; import tachyon.client.file.FileSystem;
import tachyon.client.file.URIStatus;
import tachyon.client.file.options.CreateDirectoryOptions;
import tachyon.client.file.options.CreateFileOptions;
import tachyon.client.file.options.DeleteOptions; import tachyon.client.file.options.DeleteOptions;
import tachyon.client.file.options.MkdirOptions;
import tachyon.client.file.options.OutStreamOptions;
import tachyon.conf.TachyonConf; import tachyon.conf.TachyonConf;
import tachyon.exception.ConnectionFailedException; import tachyon.exception.ConnectionFailedException;
import tachyon.exception.ExceptionMessage; import tachyon.exception.ExceptionMessage;
Expand All @@ -54,7 +54,6 @@
import tachyon.exception.PreconditionMessage; import tachyon.exception.PreconditionMessage;
import tachyon.exception.TachyonException; import tachyon.exception.TachyonException;
import tachyon.thrift.FileBlockInfo; import tachyon.thrift.FileBlockInfo;
import tachyon.thrift.FileInfo;
import tachyon.thrift.NetAddress; import tachyon.thrift.NetAddress;
import tachyon.util.CommonUtils; import tachyon.util.CommonUtils;


Expand Down Expand Up @@ -86,22 +85,7 @@ abstract class AbstractTFS extends org.apache.hadoop.fs.FileSystem {
public FSDataOutputStream append(Path cPath, int bufferSize, Progressable progress) public FSDataOutputStream append(Path cPath, int bufferSize, Progressable progress)
throws IOException { throws IOException {
LOG.info("append({}, {}, {})", cPath, bufferSize, progress); LOG.info("append({}, {}, {})", cPath, bufferSize, progress);
if (mStatistics != null) { throw new UnsupportedOperationException("Append is not supported in Tachyon.");
mStatistics.incrementWriteOps(1);
}
TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath));
TachyonFile file;
try {
file = mTFS.open(path);
if (mTFS.getInfo(file).length > 0) {
LOG.warn("Appending to nonempty file. This may be an error.");
}
} catch (TachyonException e) {
throw new IOException(e);
}

return new FSDataOutputStream(mTFS.getOutStream(file.getFileId(), OutStreamOptions.defaults()),
mStatistics);
} }


@Override @Override
Expand Down Expand Up @@ -135,27 +119,24 @@ public FSDataOutputStream create(Path cPath, FsPermission permission, boolean ov
// Check whether the file already exists, and delete it if overwrite is true // Check whether the file already exists, and delete it if overwrite is true
TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath)); TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath));
try { try {
TachyonFile file = mTFS.openIfExists(path); if (mTFS.exists(path)) {
if (file != null) {
if (!overwrite) { if (!overwrite) {
throw new IOException(ExceptionMessage.FILE_ALREADY_EXISTS.getMessage(cPath.toString())); throw new IOException(ExceptionMessage.FILE_ALREADY_EXISTS.getMessage(cPath.toString()));
} }
FileInfo info = mTFS.getInfo(file); if (mTFS.getStatus(path).isFolder()) {
if (info.isIsFolder()) {
throw new IOException( throw new IOException(
ExceptionMessage.FILE_CREATE_IS_DIRECTORY.getMessage(cPath.toString())); ExceptionMessage.FILE_CREATE_IS_DIRECTORY.getMessage(cPath.toString()));
} }
mTFS.delete(file); mTFS.delete(path);
} }
} catch (TachyonException e) { } catch (TachyonException e) {
throw new IOException(e); throw new IOException(e);
} }


// The file no longer exists at this point, so we can create it // The file no longer exists at this point, so we can create it
OutStreamOptions options = CreateFileOptions options = CreateFileOptions.defaults().setBlockSizeBytes(blockSize);
new OutStreamOptions.Builder(mTachyonConf).setBlockSizeBytes(blockSize).build();
try { try {
FileOutStream outStream = mTFS.getOutStream(path, options); FileOutStream outStream = mTFS.createFile(path, options);
return new FSDataOutputStream(outStream, mStatistics); return new FSDataOutputStream(outStream, mStatistics);
} catch (TachyonException e) { } catch (TachyonException e) {
throw new IOException(e); throw new IOException(e);
Expand Down Expand Up @@ -188,7 +169,7 @@ public FSDataOutputStream createNonRecursive(Path cPath, FsPermission permission
boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress)
throws IOException { throws IOException {
TachyonURI parentPath = new TachyonURI(Utils.getPathWithoutScheme(cPath.getParent())); TachyonURI parentPath = new TachyonURI(Utils.getPathWithoutScheme(cPath.getParent()));
tryOpen(parentPath); ensureExists(parentPath);
return this.create(cPath, permission, overwrite, bufferSize, replication, blockSize, progress); return this.create(cPath, permission, overwrite, bufferSize, replication, blockSize, progress);
} }


Expand Down Expand Up @@ -224,8 +205,7 @@ public boolean delete(Path cPath, boolean recursive) throws IOException {
TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath)); TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath));
DeleteOptions options = new DeleteOptions.Builder().setRecursive(recursive).build(); DeleteOptions options = new DeleteOptions.Builder().setRecursive(recursive).build();
try { try {
TachyonFile file = mTFS.open(path); mTFS.delete(path, options);
mTFS.delete(file, options);
return true; return true;
} catch (InvalidPathException e) { } catch (InvalidPathException e) {
LOG.info("delete failed: {}", e.getMessage()); LOG.info("delete failed: {}", e.getMessage());
Expand Down Expand Up @@ -254,8 +234,13 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
} }


TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(file.getPath())); TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(file.getPath()));
TachyonFile fileMetadata = tryOpen(path); URIStatus status;
List<FileBlockInfo> blocks = getFileBlocks(fileMetadata.getFileId()); try {
status = mTFS.getStatus(path);
} catch (TachyonException e) {
throw new IOException(e);
}
List<FileBlockInfo> blocks = getFileBlocks(status.getFileId());


List<BlockLocation> blockLocations = new ArrayList<BlockLocation>(); List<BlockLocation> blockLocations = new ArrayList<BlockLocation>();
for (int k = 0; k < blocks.size(); k ++) { for (int k = 0; k < blocks.size(); k ++) {
Expand Down Expand Up @@ -304,17 +289,16 @@ public FileStatus getFileStatus(Path path) throws IOException {
if (mStatistics != null) { if (mStatistics != null) {
mStatistics.incrementReadOps(1); mStatistics.incrementReadOps(1);
} }
FileInfo fileStatus; URIStatus fileStatus;
try { try {
TachyonFile file = mTFS.open(tPath); fileStatus = mTFS.getStatus(tPath);
fileStatus = mTFS.getInfo(file);
} catch (InvalidPathException e) { } catch (InvalidPathException e) {
throw new FileNotFoundException(e.getMessage()); throw new FileNotFoundException(e.getMessage());
} catch (TachyonException e) { } catch (TachyonException e) {
throw new IOException(e); throw new IOException(e);
} }


FileStatus ret = new FileStatus(fileStatus.getLength(), fileStatus.isIsFolder(), FileStatus ret = new FileStatus(fileStatus.getLength(), fileStatus.isFolder(),
BLOCK_REPLICATION_CONSTANT, fileStatus.getBlockSizeBytes(), fileStatus.getCreationTimeMs(), BLOCK_REPLICATION_CONSTANT, fileStatus.getBlockSizeBytes(), fileStatus.getCreationTimeMs(),
fileStatus.getCreationTimeMs(), null, null, null, new Path(mTachyonHeader + tPath)); fileStatus.getCreationTimeMs(), null, null, null, new Path(mTachyonHeader + tPath));
return ret; return ret;
Expand Down Expand Up @@ -396,21 +380,20 @@ public FileStatus[] listStatus(Path path) throws IOException {
mStatistics.incrementReadOps(1); mStatistics.incrementReadOps(1);
} }


List<FileInfo> files; List<URIStatus> statuses;
try { try {
TachyonFile file = mTFS.open(tPath); statuses = mTFS.listStatus(tPath);
files = mTFS.listStatus(file);
} catch (TachyonException e) { } catch (TachyonException e) {
throw new IOException(e); throw new IOException(e);
} }


FileStatus[] ret = new FileStatus[files.size()]; FileStatus[] ret = new FileStatus[statuses.size()];
for (int k = 0; k < files.size(); k ++) { for (int k = 0; k < statuses.size(); k ++) {
FileInfo info = files.get(k); URIStatus status = statuses.get(k);
// TODO(hy): Replicate 3 with the number of disk replications. // TODO(hy): Replicate 3 with the number of disk replications.
ret[k] = new FileStatus(info.getLength(), info.isFolder, 3, info.getBlockSizeBytes(), ret[k] = new FileStatus(status.getLength(), status.isFolder(), 3, status.getBlockSizeBytes(),
info.getCreationTimeMs(), info.getCreationTimeMs(), null, null, null, status.getCreationTimeMs(), status.getCreationTimeMs(), null, null, null,
new Path(mTachyonHeader + info.getPath())); new Path(mTachyonHeader + status.getPath()));
} }
return ret; return ret;
} }
Expand All @@ -430,10 +413,11 @@ public boolean mkdirs(Path cPath, FsPermission permission) throws IOException {
mStatistics.incrementWriteOps(1); mStatistics.incrementWriteOps(1);
} }
TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath)); TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath));
MkdirOptions options = CreateDirectoryOptions options =
new MkdirOptions.Builder(mTachyonConf).setRecursive(true).setAllowExists(true).build(); CreateDirectoryOptions.defaults().setRecursive(true).setAllowExists(true);
try { try {
return mTFS.mkdir(path, options); mTFS.createDirectory(path, options);
return true;
} catch (TachyonException e) { } catch (TachyonException e) {
throw new IOException(e); throw new IOException(e);
} }
Expand All @@ -455,11 +439,14 @@ public FSDataInputStream open(Path cPath, int bufferSize) throws IOException {
} }


TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath)); TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath));
TachyonFile file = tryOpen(path); try {
long fileId = file.getFileId(); long fileId = mTFS.getStatus(path).getFileId();


return new FSDataInputStream(new HdfsFileInputStream(fileId, return new FSDataInputStream(new HdfsFileInputStream(fileId,
Utils.getHDFSPath(path, mUnderFSAddress), getConf(), bufferSize, mStatistics)); Utils.getHDFSPath(path, mUnderFSAddress), getConf(), bufferSize, mStatistics));
} catch (TachyonException e) {
throw new IOException(e);
}
} }


@Override @Override
Expand All @@ -471,22 +458,22 @@ public boolean rename(Path src, Path dst) throws IOException {


TachyonURI srcPath = new TachyonURI(Utils.getPathWithoutScheme(src)); TachyonURI srcPath = new TachyonURI(Utils.getPathWithoutScheme(src));
TachyonURI dstPath = new TachyonURI(Utils.getPathWithoutScheme(dst)); TachyonURI dstPath = new TachyonURI(Utils.getPathWithoutScheme(dst));
TachyonFile srcFile = tryOpen(srcPath); ensureExists(srcPath);
FileInfo info; URIStatus dstStatus;
try { try {
TachyonFile file = mTFS.open(dstPath); dstStatus = mTFS.getStatus(dstPath);
info = mTFS.getInfo(file);
} catch (IOException e) { } catch (IOException e) {
info = null; dstStatus = null;
} catch (TachyonException e) { } catch (TachyonException e) {
info = null; dstStatus = null;
} }
// If the destination is an existing folder, try to move the src into the folder // If the destination is an existing folder, try to move the src into the folder
if (info != null && info.isFolder) { if (dstStatus != null && dstStatus.isFolder()) {
dstPath = dstPath.join(srcPath.getName()); dstPath = dstPath.join(srcPath.getName());
} }
try { try {
return mTFS.rename(srcFile, dstPath); mTFS.rename(srcPath, dstPath);
return true;
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to rename {} to {}", src, dst, e); LOG.error("Failed to rename {} to {}", src, dst, e);
return false; return false;
Expand All @@ -507,15 +494,15 @@ public void setWorkingDirectory(Path path) {
} }


/** /**
* Convenience method which opens a {@link TachyonFile} for the given path, wrapping any * Convenience method which ensures the given path exists, wrapping any {@link TachyonException}
* {@link TachyonException} in {@link IOException}. * in {@link IOException}.
* *
* @param path the path to look up * @param path the path to look up
* @throws IOException if a Tachyon exception occurs * @throws IOException if a Tachyon exception occurs
*/ */
private TachyonFile tryOpen(TachyonURI path) throws IOException { private void ensureExists(TachyonURI path) throws IOException {
try { try {
return mTFS.open(path); mTFS.getStatus(path);
} catch (TachyonException te) { } catch (TachyonException te) {
throw new IOException(te); throw new IOException(te);
} }
Expand Down

0 comments on commit b791ad5

Please sign in to comment.