Skip to content

Commit

Permalink
Update with workerFileId.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinjia committed Mar 8, 2016
1 parent 0b17b76 commit b93ee1f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 24 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class FileSystemWorker extends AbstractWorker {
private final Configuration mConf; private final Configuration mConf;
/** Logic for handling RPC requests. */ /** Logic for handling RPC requests. */
private final FileSystemWorkerClientServiceHandler mServiceHandler; private final FileSystemWorkerClientServiceHandler mServiceHandler;
/** Manager for under file system operations */ /** Manager for under file system operations. */
private final UnderFileSystemManager mUnderFileSystemManager; private final UnderFileSystemManager mUnderFileSystemManager;


/** The service that persists files. */ /** The service that persists files. */
Expand Down Expand Up @@ -115,14 +115,41 @@ public void stop() {
getExecutorService().shutdown(); getExecutorService().shutdown();
} }


public void ufsCancelFile(String path) throws FileDoesNotExistException, IOException { /**
mUnderFileSystemManager.cancelFile(path); * Cancels a file currently being written to the under file system. The open stream will be
* closed and the partial file will be cleaned up.
*
* @param workerFileId the id of the file to cancel, only understood by the worker that created
* the file
* @throws FileDoesNotExistException if this worker is not writing the specified file
* @throws IOException if an error occurs interacting with the under file system
*/
public void ufsCancelFile(long workerFileId) throws FileDoesNotExistException, IOException {
mUnderFileSystemManager.cancelFile(workerFileId);
} }


public void ufsCompleteFile(String path) throws FileDoesNotExistException, IOException { /**
mUnderFileSystemManager.completeFile(path); * Completes a file currently being written to the under file system. The open stream will be
* closed and the partial file will be promoted to the completed file in the under file system.
*
* @param workerFileId the id of the file to cancel, only understood by the worker that created
* the file
* @throws FileDoesNotExistException if the worker is not writing the specified file
* @throws IOException if an error occurs interacting with the under file system
*/
public void ufsCompleteFile(long workerFileId) throws FileDoesNotExistException, IOException {
mUnderFileSystemManager.completeFile(workerFileId);
} }


/**
* Creates a new file in the under file system. This will register a new stream in the under
* file system manager. The stream can only be accessed with the returned id afterward.
*
* @param path the path of the file to create
* @throws FileAlreadyExistsException if a file already exists in the under file system with
* the same path
* @throws IOException if an error occurs interacting with the under file system
*/
public void ufsCreateFile(String path) throws FileAlreadyExistsException, IOException { public void ufsCreateFile(String path) throws FileAlreadyExistsException, IOException {
mUnderFileSystemManager.createFile(path); mUnderFileSystemManager.createFile(path);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ public long getServiceVersion() {
} }


@Override @Override
public void ufsCancelFile(String path, UFSCancelFileTOptions options) public void ufsCancelFile(long workerFileId, UFSCancelFileTOptions options)
throws AlluxioTException, ThriftIOException { throws AlluxioTException, ThriftIOException {
try { try {
mWorker.ufsCancelFile(path); mWorker.ufsCancelFile(workerFileId);
} catch (IOException e) { } catch (IOException e) {
throw new ThriftIOException(e.getMessage()); throw new ThriftIOException(e.getMessage());
} catch (AlluxioException e) { } catch (AlluxioException e) {
Expand All @@ -59,10 +59,10 @@ public void ufsCancelFile(String path, UFSCancelFileTOptions options)
} }


@Override @Override
public void ufsCompleteFile(String path, UFSCompleteFileTOptions options) public void ufsCompleteFile(long workerFileId, UFSCompleteFileTOptions options)
throws AlluxioTException, ThriftIOException { throws AlluxioTException, ThriftIOException {
try { try {
mWorker.ufsCompleteFile(path); mWorker.ufsCompleteFile(workerFileId);
} catch (IOException e) { } catch (IOException e) {
throw new ThriftIOException(e.getMessage()); throw new ThriftIOException(e.getMessage());
} catch (AlluxioException e) { } catch (AlluxioException e) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,38 +17,54 @@
* file names to open streams. * file names to open streams.
*/ */
public class UnderFileSystemManager { public class UnderFileSystemManager {
private ConcurrentMap<String, OutputStream> mFileToOutputStreamMap; private class NamedOutputStream {
private final OutputStream mStream;
private final String mPath;

private NamedOutputStream(OutputStream stream, String path) {
mStream = stream;
mPath = path;
}

public OutputStream getStream() {
return mStream;
}

public String getPath() {
return mPath;
}
}


private ConcurrentMap<Long, NamedOutputStream> mStreams;


public UnderFileSystemManager() { public UnderFileSystemManager() {
mFileToOutputStreamMap = new ConcurrentHashMap<>(); mStreams = new ConcurrentHashMap<>();
} }


public OutputStream createFile(String path) throws FileAlreadyExistsException, IOException { public OutputStream createFile(String path) throws FileAlreadyExistsException, IOException {
OutputStream stream = mFileToOutputStreamMap.get(path); UnderFileSystem ufs = UnderFileSystem.get(path, WorkerContext.getConf());
if (stream == null) { if (ufs.exists(path)) {
UnderFileSystem ufs = UnderFileSystem.get(path, WorkerContext.getConf()); throw new FileAlreadyExistsException(ExceptionMessage.FAILED_UFS_CREATE.getMessage(path));
OutputStream newStream = ufs.create(path);
stream = mFileToOutputStreamMap.putIfAbsent(path, newStream);
if (stream == null) {
return newStream;
}
} }
throw new FileAlreadyExistsException(ExceptionMessage.FAILED_UFS_CREATE.getMessage(path));
OutputStream newStream = ufs.create(path);
stream = mStreams.putIfAbsent(path, newStream);
} }


public void cancelFile(String path) throws FileDoesNotExistException, IOException { public void cancelFile(long workerFileId) throws FileDoesNotExistException, IOException {
closeFile(path); closeFile(path);
UnderFileSystem ufs = UnderFileSystem.get(path, WorkerContext.getConf()); UnderFileSystem ufs = UnderFileSystem.get(path, WorkerContext.getConf());
ufs.delete(path, false); ufs.delete(path, false);
} }


public void completeFile(String path) throws FileDoesNotExistException, IOException { public void completeFile(long workerFileId) throws FileDoesNotExistException, IOException {
closeFile(path); closeFile(path);
} }


// TODO(calvin): Make the exception accurate. // TODO(calvin): Make the exception accurate.
private void closeFile(String path) throws FileDoesNotExistException, IOException { private void closeFile(long workerFileId) throws FileDoesNotExistException, IOException {
OutputStream stream = mFileToOutputStreamMap.remove(path); OutputStream stream = mStreams.remove(workerFileId);
if (stream != null) { if (stream != null) {
stream.close(); stream.close();
} else { } else {
Expand Down

0 comments on commit b93ee1f

Please sign in to comment.