Skip to content

Commit

Permalink
Refactoring the user facing API.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsimsa committed Sep 23, 2015
1 parent 4207b78 commit f1c31fb
Show file tree
Hide file tree
Showing 15 changed files with 772 additions and 253 deletions.
Expand Up @@ -17,6 +17,7 @@


import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional;


import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -25,7 +26,13 @@
import tachyon.TachyonURI; import tachyon.TachyonURI;
import tachyon.annotation.PublicApi; import tachyon.annotation.PublicApi;
import tachyon.client.FileSystemMasterClient; import tachyon.client.FileSystemMasterClient;
import tachyon.thrift.BlockInfoException; import tachyon.client.options.DeleteOptions;
import tachyon.client.options.FreeOptions;
import tachyon.client.options.LoadOptions;
import tachyon.client.options.MkdirOptions;
import tachyon.client.options.SetStateOptions;
import tachyon.exception.TachyonException;
import tachyon.exception.TachyonExceptionType;
import tachyon.thrift.FileAlreadyExistException; import tachyon.thrift.FileAlreadyExistException;
import tachyon.thrift.FileDoesNotExistException; import tachyon.thrift.FileDoesNotExistException;
import tachyon.thrift.FileInfo; import tachyon.thrift.FileInfo;
Expand All @@ -51,19 +58,6 @@ protected AbstractTachyonFileSystem() {
mContext = FileSystemContext.INSTANCE; mContext = FileSystemContext.INSTANCE;
} }


@Override
public long create(TachyonURI path, long blockSize, boolean recursive) throws
BlockInfoException, FileAlreadyExistException, InvalidPathException, IOException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try {
long fileId = masterClient.createFile(path.getPath(), blockSize, recursive);
LOG.info("Created file " + path + " with file ID: " + fileId);
return fileId;
} finally {
mContext.releaseMasterClient(masterClient);
}
}

/** /**
* {@inheritDoc} * {@inheritDoc}
* *
Expand All @@ -74,13 +68,14 @@ public long create(TachyonURI path, long blockSize, boolean recursive) throws
* current readers have relinquished their locks. * current readers have relinquished their locks.
*/ */
@Override @Override
public void delete(TachyonFile file, boolean recursive) throws FileDoesNotExistException, public void delete(TachyonFile file, DeleteOptions options) throws IOException, TachyonException {
IOException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
masterClient.deleteFile(file.getFileId(), recursive); masterClient.deleteFile(file.getFileId(), options.isRecursive());
LOG.info( LOG.info(
"Deleted file " + file.getFileId() + " from both Tachyon Storage and under file system"); "Deleted file " + file.getFileId() + " from both Tachyon Storage and under file system");
} catch (FileDoesNotExistException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.FILE_DOES_NOT_EXIST);
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
} }
Expand All @@ -92,12 +87,13 @@ public void delete(TachyonFile file, boolean recursive) throws FileDoesNotExistE
* This method is asynchronous and will be propagated to the workers through their heartbeats. * This method is asynchronous and will be propagated to the workers through their heartbeats.
*/ */
@Override @Override
public void free(TachyonFile file, boolean recursive) throws FileDoesNotExistException, public void free(TachyonFile file, FreeOptions options) throws IOException, TachyonException {
IOException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
masterClient.free(file.getFileId(), recursive); masterClient.free(file.getFileId(), options.isRecursive());
LOG.info("Removed file " + file.getFileId() + " from Tachyon Storage"); LOG.info("Removed file " + file.getFileId() + " from Tachyon Storage");
} catch (FileDoesNotExistException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.FILE_DOES_NOT_EXIST);
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
} }
Expand All @@ -110,7 +106,7 @@ public void free(TachyonFile file, boolean recursive) throws FileDoesNotExistExc
* path are possibly inconsistent. * path are possibly inconsistent.
*/ */
@Override @Override
public FileInfo getInfo(TachyonFile file) throws IOException { public FileInfo getInfo(TachyonFile file) throws IOException, TachyonException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
return masterClient.getFileInfo(file.getFileId()); return masterClient.getFileInfo(file.getFileId());
Expand All @@ -128,10 +124,12 @@ public FileInfo getInfo(TachyonFile file) throws IOException {
* path are possibly inconsistent. * path are possibly inconsistent.
*/ */
@Override @Override
public List<FileInfo> listStatus(TachyonFile file) throws FileDoesNotExistException, IOException { public List<FileInfo> listStatus(TachyonFile file) throws IOException, TachyonException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
return masterClient.getFileInfoList(file.getFileId()); return masterClient.getFileInfoList(file.getFileId());
} catch (FileDoesNotExistException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.FILE_DOES_NOT_EXIST);
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
} }
Expand All @@ -145,73 +143,83 @@ public List<FileInfo> listStatus(TachyonFile file) throws FileDoesNotExistExcept
* example the load command of the Tachyon Shell. * example the load command of the Tachyon Shell.
*/ */
@Override @Override
public long loadFileInfoFromUfs(TachyonURI path, TachyonURI ufsPath, boolean recursive) public long load(TachyonURI path, TachyonURI ufsPath, LoadOptions options) throws IOException,
throws FileDoesNotExistException, IOException { TachyonException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
long fileId = masterClient.loadFileInfoFromUfs(path.getPath(), ufsPath.toString(), recursive); long fileId =
LOG.info( masterClient.loadFileInfoFromUfs(path.getPath(), ufsPath.toString(),
"Loaded file " + path.getPath() + " from " + ufsPath + (recursive ? " recursively" : "")); options.isRecursive());
LOG.info("Loaded file " + path.getPath() + " from " + ufsPath
+ (options.isRecursive() ? " recursively" : ""));
return fileId; return fileId;
} catch (FileDoesNotExistException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.FILE_DOES_NOT_EXIST);
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
} }
} }


@Override @Override
public boolean mkdirs(TachyonURI path, boolean recursive) throws InvalidPathException, public boolean mkdir(TachyonURI path, MkdirOptions options) throws IOException, TachyonException {
IOException, FileAlreadyExistException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
boolean result = masterClient.createDirectory(path.getPath(), recursive); boolean result = masterClient.createDirectory(path.getPath(), options.isRecursive());
if (result) { if (result) {
LOG.info("Created directory " + path.getPath()); LOG.info("Created directory " + path.getPath());
} }
return result; return result;
} catch (FileAlreadyExistException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.FILE_ALREADY_EXISTS);
} catch (InvalidPathException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.INVALID_PATH);
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
} }
} }


@Override @Override
public TachyonFile open(TachyonURI path) throws InvalidPathException, IOException { public TachyonFile open(TachyonURI path) throws IOException, TachyonException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
return new TachyonFile(masterClient.getFileId(path.getPath())); return new TachyonFile(masterClient.getFileId(path.getPath()));
} catch (InvalidPathException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.INVALID_PATH);
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
} }
} }


/**
* {@inheritDoc}
*
* The pin status is propagated asynchronously from this method call on the worker heartbeats.
*/
@Override @Override
public void setPin(TachyonFile file, boolean pinned) throws FileDoesNotExistException, public boolean rename(TachyonFile src, TachyonURI dst) throws IOException, TachyonException {
IOException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
masterClient.setPinned(file.getFileId(), pinned); boolean result = masterClient.renameFile(src.getFileId(), dst.getPath());
LOG.info(pinned ? "Pinned" : "Unpinned" + " file " + file.getFileId()); if (result) {
LOG.info("Renamed file " + src.getFileId() + " to " + dst.getPath());
}
return result;
} catch (FileDoesNotExistException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.FILE_DOES_NOT_EXIST);
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
} }
} }


@Override @Override
public boolean rename(TachyonFile src, TachyonURI dst) throws FileDoesNotExistException, public void setState(TachyonFile file, SetStateOptions options) throws IOException,
IOException { TachyonException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { Optional<Boolean> pinned = options.getPinned();
boolean result = masterClient.renameFile(src.getFileId(), dst.getPath()); if (pinned.isPresent()) {
if (result) { try {
LOG.info("Renamed file " + src.getFileId() + " to " + dst.getPath()); masterClient.setPinned(file.getFileId(), pinned.get());
LOG.info(pinned.get() ? "Pinned" : "Unpinned" + " file " + file.getFileId());
} catch (FileDoesNotExistException e) {
throw new TachyonException(e.getMessage(), TachyonExceptionType.FILE_DOES_NOT_EXIST);
} finally {
mContext.releaseMasterClient(masterClient);
} }
return result;
} finally {
mContext.releaseMasterClient(masterClient);
} }
} }
} }
Expand Up @@ -28,15 +28,14 @@
import tachyon.annotation.PublicApi; import tachyon.annotation.PublicApi;
import tachyon.client.BoundedStream; import tachyon.client.BoundedStream;
import tachyon.client.ClientContext; import tachyon.client.ClientContext;
import tachyon.client.ClientOptions;
import tachyon.client.Seekable; import tachyon.client.Seekable;
import tachyon.client.TachyonStorageType; import tachyon.client.TachyonStorageType;
import tachyon.client.block.BlockInStream; import tachyon.client.block.BlockInStream;
import tachyon.client.block.BufferedBlockOutStream; import tachyon.client.block.BufferedBlockOutStream;
import tachyon.client.block.LocalBlockInStream; import tachyon.client.block.LocalBlockInStream;
import tachyon.client.options.InStreamOptions;
import tachyon.master.block.BlockId; import tachyon.master.block.BlockId;
import tachyon.thrift.FileInfo; import tachyon.thrift.FileInfo;
import tachyon.thrift.NetAddress;
import tachyon.util.network.NetworkAddressUtils; import tachyon.util.network.NetworkAddressUtils;


/** /**
Expand Down Expand Up @@ -84,7 +83,7 @@ public final class FileInStream extends InputStream implements BoundedStream, Se
* @param info the file information * @param info the file information
* @param options the client options * @param options the client options
*/ */
public FileInStream(FileInfo info, ClientOptions options) { public FileInStream(FileInfo info, InStreamOptions options) {
mBlockSize = info.getBlockSizeBytes(); mBlockSize = info.getBlockSizeBytes();
mFileLength = info.getLength(); mFileLength = info.getLength();
mBlockIds = info.getBlockIds(); mBlockIds = info.getBlockIds();
Expand Down Expand Up @@ -228,7 +227,7 @@ private void checkAndAdvanceBlockInStream() throws IOException {
try { try {
// TODO(calvin): Specify the location to be local. // TODO(calvin): Specify the location to be local.
mCurrentCacheStream = mCurrentCacheStream =
mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1,
NetworkAddressUtils.getLocalHostName(ClientContext.getConf())); NetworkAddressUtils.getLocalHostName(ClientContext.getConf()));
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Failed to get TachyonStore stream, the block " + currentBlockId LOG.warn("Failed to get TachyonStore stream, the block " + currentBlockId
Expand Down Expand Up @@ -288,7 +287,7 @@ private void seekBlockInStream(long newPos) throws IOException {
if (mPos % mBlockSize == 0 && mShouldCacheCurrentBlock) { if (mPos % mBlockSize == 0 && mShouldCacheCurrentBlock) {
try { try {
mCurrentCacheStream = mCurrentCacheStream =
mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1, mContext.getTachyonBlockStore().getOutStream(currentBlockId, -1,
NetworkAddressUtils.getLocalHostName(ClientContext.getConf())); NetworkAddressUtils.getLocalHostName(ClientContext.getConf()));
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Failed to write to TachyonStore stream, block " + getCurrentBlockId() LOG.warn("Failed to write to TachyonStore stream, block " + getCurrentBlockId()
Expand Down
Expand Up @@ -30,12 +30,12 @@
import tachyon.annotation.PublicApi; import tachyon.annotation.PublicApi;
import tachyon.client.Cancelable; import tachyon.client.Cancelable;
import tachyon.client.ClientContext; import tachyon.client.ClientContext;
import tachyon.client.ClientOptions;
import tachyon.client.FileSystemMasterClient; import tachyon.client.FileSystemMasterClient;
import tachyon.client.TachyonStorageType; import tachyon.client.TachyonStorageType;
import tachyon.client.UnderStorageType; import tachyon.client.UnderStorageType;
import tachyon.client.block.BlockStoreContext; import tachyon.client.block.BlockStoreContext;
import tachyon.client.block.BufferedBlockOutStream; import tachyon.client.block.BufferedBlockOutStream;
import tachyon.client.options.OutStreamOptions;
import tachyon.underfs.UnderFileSystem; import tachyon.underfs.UnderFileSystem;
import tachyon.util.io.PathUtils; import tachyon.util.io.PathUtils;
import tachyon.worker.WorkerClient; import tachyon.worker.WorkerClient;
Expand Down Expand Up @@ -74,7 +74,7 @@ public final class FileOutStream extends OutputStream implements Cancelable {
* @param options the client options * @param options the client options
* @throws IOException if an I/O error occurs * @throws IOException if an I/O error occurs
*/ */
public FileOutStream(long fileId, ClientOptions options) throws IOException { public FileOutStream(long fileId, OutStreamOptions options) throws IOException {
mFileId = fileId; mFileId = fileId;
mBlockSize = options.getBlockSize(); mBlockSize = options.getBlockSize();
mTachyonStorageType = options.getTachyonStorageType(); mTachyonStorageType = options.getTachyonStorageType();
Expand Down

0 comments on commit f1c31fb

Please sign in to comment.