Skip to content

Commit

Permalink
Some cleanup and concurrency in the master.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsimsa committed Sep 21, 2015
1 parent 6bb575b commit 2a3bb1a
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 69 deletions.
Expand Up @@ -258,7 +258,7 @@ public void addCheckpointTest() throws FileDoesNotExistException, SuspectedFileS
FileInfo fileInfo = mFsMaster.getFileInfo(fileId); FileInfo fileInfo = mFsMaster.getFileInfo(fileId);
Assert.assertEquals(PathUtils.concatPath(mLocalTachyonCluster.getTachyonHome(), "testFile"), Assert.assertEquals(PathUtils.concatPath(mLocalTachyonCluster.getTachyonHome(), "testFile"),
fileInfo.getUfsPath()); fileInfo.getUfsPath());
mFsMaster.completeFileCheckpoint(-1, fileId, 1, mFsMaster.addCheckpoint(-1, fileId, 1,
new TachyonURI(PathUtils.concatPath(mMountPoint, "testPath"))); new TachyonURI(PathUtils.concatPath(mMountPoint, "testPath")));
// TODO(jiri): Check the file exists in UFS. // TODO(jiri): Check the file exists in UFS.
} }
Expand Down Expand Up @@ -589,7 +589,7 @@ public void lastModificationTimeAddCheckpointTest() throws FileDoesNotExistExcep
mFsMaster.createFile(new TachyonURI(PathUtils.concatPath(mMountPoint, "testFile")), mFsMaster.createFile(new TachyonURI(PathUtils.concatPath(mMountPoint, "testFile")),
Constants.DEFAULT_BLOCK_SIZE_BYTE, true); Constants.DEFAULT_BLOCK_SIZE_BYTE, true);
long opTimeMs = System.currentTimeMillis(); long opTimeMs = System.currentTimeMillis();
mFsMaster.completeFileCheckpointInternal(-1, fileId, 1, mFsMaster.addCheckpointInternal(-1, fileId, 1,
new TachyonURI(PathUtils.concatPath(mMountPoint, "testPath")), opTimeMs); new TachyonURI(PathUtils.concatPath(mMountPoint, "testPath")), opTimeMs);
FileInfo fileInfo = mFsMaster.getFileInfo(fileId); FileInfo fileInfo = mFsMaster.getFileInfo(fileId);
Assert.assertEquals(opTimeMs, fileInfo.lastModificationTimeMs); Assert.assertEquals(opTimeMs, fileInfo.lastModificationTimeMs);
Expand Down Expand Up @@ -689,28 +689,31 @@ public void listFilesTest() throws InvalidPathException, FileDoesNotExistExcepti
public void lsTest() throws FileAlreadyExistException, InvalidPathException, TachyonException, public void lsTest() throws FileAlreadyExistException, InvalidPathException, TachyonException,
BlockInfoException, FileDoesNotExistException { BlockInfoException, FileDoesNotExistException {
for (int i = 0; i < 10; i ++) { for (int i = 0; i < 10; i ++) {
mFsMaster.mkdirs(new TachyonURI("/i" + i), true); mFsMaster.mkdirs(new TachyonURI(PathUtils.concatPath(mMountPoint, "i" + i)), true);
for (int j = 0; j < 10; j ++) { for (int j = 0; j < 10; j ++) {
mFsMaster.createFile(new TachyonURI("/i" + i + "/j" + j), 64, true); mFsMaster.createFile(
new TachyonURI(PathUtils.concatPath(mMountPoint, "i" + i, "j" + j)), 64, true);
} }
} }


Assert.assertEquals(1, Assert.assertEquals(1,
mFsMaster.getFileInfoList(mFsMaster.getFileId(new TachyonURI("/i0/j0"))).size()); mFsMaster.getFileInfoList(mFsMaster.getFileId(
new TachyonURI(PathUtils.concatPath(mMountPoint, "i0", "j0")))).size());
for (int i = 0; i < 10; i ++) { for (int i = 0; i < 10; i ++) {
Assert.assertEquals(10, Assert.assertEquals(10,
mFsMaster.getFileInfoList(mFsMaster.getFileId(new TachyonURI("/i" + i))).size()); mFsMaster.getFileInfoList(mFsMaster.getFileId(
new TachyonURI(PathUtils.concatPath(mMountPoint, "i" + i)))).size());
} }
Assert.assertEquals(10, Assert.assertEquals(10,
mFsMaster.getFileInfoList(mFsMaster.getFileId(new TachyonURI("/"))).size()); mFsMaster.getFileInfoList(mFsMaster.getFileId(new TachyonURI(mMountPoint))).size());
} }


@Test @Test
public void notFileCheckpointTest() throws FileDoesNotExistException, SuspectedFileSizeException, public void notFileCheckpointTest() throws FileDoesNotExistException, SuspectedFileSizeException,
FileAlreadyExistException, InvalidPathException, BlockInfoException, TachyonException { FileAlreadyExistException, InvalidPathException, BlockInfoException, TachyonException {
mThrown.expect(FileDoesNotExistException.class); mThrown.expect(FileDoesNotExistException.class);
mFsMaster.mkdirs(new TachyonURI(PathUtils.concatPath(mMountPoint, "testFile")), true); mFsMaster.mkdirs(new TachyonURI(PathUtils.concatPath(mMountPoint, "testFile")), true);
mFsMaster.completeFileCheckpoint(-1, mFsMaster.addCheckpoint(-1,
mFsMaster.getFileId(new TachyonURI(PathUtils.concatPath(mMountPoint, "testFile"))), 0, mFsMaster.getFileId(new TachyonURI(PathUtils.concatPath(mMountPoint, "testFile"))), 0,
new TachyonURI(PathUtils.concatPath(mMountPoint, "testPath"))); new TachyonURI(PathUtils.concatPath(mMountPoint, "testPath")));
} }
Expand Down
128 changes: 70 additions & 58 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -162,7 +162,7 @@ public void processJournalEntry(JournalEntry entry) throws IOException {
} else if (entry instanceof CompleteFileEntry) { } else if (entry instanceof CompleteFileEntry) {
completeFileFromEntry((CompleteFileEntry) entry); completeFileFromEntry((CompleteFileEntry) entry);
} else if (entry instanceof AddCheckpointEntry) { } else if (entry instanceof AddCheckpointEntry) {
completeFileCheckpointFromEntry((AddCheckpointEntry) entry); addCheckpointFromEntry((AddCheckpointEntry) entry);
} else if (entry instanceof SetPinnedEntry) { } else if (entry instanceof SetPinnedEntry) {
setPinnedFromEntry((SetPinnedEntry) entry); setPinnedFromEntry((SetPinnedEntry) entry);
} else if (entry instanceof DeleteFileEntry) { } else if (entry instanceof DeleteFileEntry) {
Expand Down Expand Up @@ -213,7 +213,7 @@ public void stop() throws IOException {
} }


/** /**
* Completes a file checkpoint in ufs. Called via RPC. * Adds a file checkpoint in ufs. Called via RPC.
* *
* @param workerId the worker id completing the ufs checkpoint * @param workerId the worker id completing the ufs checkpoint
* @param fileId the file id associated with the ufs checkpoint * @param fileId the file id associated with the ufs checkpoint
Expand All @@ -224,13 +224,13 @@ public void stop() throws IOException {
* @throws BlockInfoException * @throws BlockInfoException
* @throws FileDoesNotExistException * @throws FileDoesNotExistException
*/ */
public boolean completeFileCheckpoint(long workerId, long fileId, long length, public boolean addCheckpoint(long workerId, long fileId, long length,
TachyonURI checkpointPath) TachyonURI checkpointPath)
throws SuspectedFileSizeException, BlockInfoException, FileDoesNotExistException { throws SuspectedFileSizeException, BlockInfoException, FileDoesNotExistException {
synchronized (mInodeTree) { synchronized (mInodeTree) {
long opTimeMs = System.currentTimeMillis(); long opTimeMs = System.currentTimeMillis();
LOG.info(FormatUtils.parametersToString(workerId, fileId, length, checkpointPath)); LOG.info(FormatUtils.parametersToString(workerId, fileId, length, checkpointPath));
if (completeFileCheckpointInternal(workerId, fileId, length, checkpointPath, opTimeMs)) { if (addCheckpointInternal(workerId, fileId, length, checkpointPath, opTimeMs)) {
writeJournalEntry( writeJournalEntry(
new AddCheckpointEntry(workerId, fileId, length, checkpointPath, opTimeMs)); new AddCheckpointEntry(workerId, fileId, length, checkpointPath, opTimeMs));
flushJournal(); flushJournal();
Expand All @@ -244,7 +244,7 @@ public boolean completeFileCheckpoint(long workerId, long fileId, long length,
* *
* @return true if the operation should be written to the journal * @return true if the operation should be written to the journal
*/ */
boolean completeFileCheckpointInternal(long workerId, long fileId, long length, boolean addCheckpointInternal(long workerId, long fileId, long length,
TachyonURI checkpointPath, long opTimeMs) throws SuspectedFileSizeException, TachyonURI checkpointPath, long opTimeMs) throws SuspectedFileSizeException,
BlockInfoException, FileDoesNotExistException { BlockInfoException, FileDoesNotExistException {


Expand Down Expand Up @@ -278,28 +278,25 @@ boolean completeFileCheckpointInternal(long workerId, long fileId, long length,
file.setPersisted(true); file.setPersisted(true);
needLog = true; needLog = true;


synchronized (mDependencyMap) { Dependency dep = mDependencyMap.getFromFileId(fileId);
Dependency dep = mDependencyMap.getFromFileId(fileId); if (dep != null) {
if (dep != null) { dep.childCheckpointed(fileId);
dep.childCheckpointed(fileId); if (dep.hasCheckpointed()) {
if (dep.hasCheckpointed()) { mDependencyMap.removeUncheckpointedDependency(dep);
mDependencyMap.removeUncheckpointedDependency(dep); mDependencyMap.removePriorityDependency(dep);
mDependencyMap.removePriorityDependency(dep);
}
} }
} }
} }
mDependencyMap.addFileCheckpoint(fileId);
file.setLastModificationTimeMs(opTimeMs); file.setLastModificationTimeMs(opTimeMs);
file.setCompleted(length); file.setCompleted(length);
MasterContext.getMasterSource().incFilesCheckpointed(); MasterContext.getMasterSource().incFilesCheckpointed();
// TODO(calvin): This probably should always be true since the last mod time is updated. // TODO(calvin): This probably should always be true since the last mod time is updated.
return needLog; return needLog;
} }


private void completeFileCheckpointFromEntry(AddCheckpointEntry entry) { private void addCheckpointFromEntry(AddCheckpointEntry entry) {
try { try {
completeFileCheckpointInternal(entry.getWorkerId(), entry.getFileId(), entry.getFileLength(), addCheckpointInternal(entry.getWorkerId(), entry.getFileId(), entry.getFileLength(),
entry.getCheckpointPath(), entry.getOperationTimeMs()); entry.getCheckpointPath(), entry.getOperationTimeMs());
} catch (FileDoesNotExistException fdnee) { } catch (FileDoesNotExistException fdnee) {
throw new RuntimeException(fdnee); throw new RuntimeException(fdnee);
Expand Down Expand Up @@ -353,11 +350,12 @@ public FileInfo getFileInfo(long fileId) throws FileDoesNotExistException {
MasterContext.getMasterSource().incGetFileStatusOps(); MasterContext.getMasterSource().incGetFileStatusOps();
synchronized (mInodeTree) { synchronized (mInodeTree) {
Inode inode = mInodeTree.getInodeById(fileId); Inode inode = mInodeTree.getInodeById(fileId);
return getFileInfo(inode); return getFileInfoInternal(inode);
} }
} }


private FileInfo getFileInfo(Inode inode) throws FileDoesNotExistException { // This function should only be called from within synchronized (mInodeTree) blocks.
private FileInfo getFileInfoInternal(Inode inode) throws FileDoesNotExistException {
FileInfo fileInfo = inode.generateClientFileInfo(mInodeTree.getPath(inode).toString()); FileInfo fileInfo = inode.generateClientFileInfo(mInodeTree.getPath(inode).toString());
fileInfo.inMemoryPercentage = getInMemoryPercentage(inode); fileInfo.inMemoryPercentage = getInMemoryPercentage(inode);
TachyonURI path = mInodeTree.getPath(inode); TachyonURI path = mInodeTree.getPath(inode);
Expand Down Expand Up @@ -386,10 +384,10 @@ public List<FileInfo> getFileInfoList(long fileId) throws FileDoesNotExistExcept
List<FileInfo> ret = new ArrayList<FileInfo>(); List<FileInfo> ret = new ArrayList<FileInfo>();
if (inode.isDirectory()) { if (inode.isDirectory()) {
for (Inode child : ((InodeDirectory) inode).getChildren()) { for (Inode child : ((InodeDirectory) inode).getChildren()) {
ret.add(getFileInfo(child)); ret.add(getFileInfoInternal(child));
} }
} else { } else {
ret.add(getFileInfo(inode)); ret.add(getFileInfoInternal(inode));
} }
return ret; return ret;
} }
Expand Down Expand Up @@ -438,6 +436,7 @@ public void completeFile(long fileId) throws FileDoesNotExistException, BlockInf
} }
} }


// This function should only be called from within synchronized (mInodeTree) blocks.
void completeFileInternal(List<Long> blockIds, long fileId, long fileLength, long opTimeMs) void completeFileInternal(List<Long> blockIds, long fileId, long fileLength, long opTimeMs)
throws FileDoesNotExistException { throws FileDoesNotExistException {
mDependencyMap.addFileCheckpoint(fileId); mDependencyMap.addFileCheckpoint(fileId);
Expand Down Expand Up @@ -482,6 +481,7 @@ public long createFile(TachyonURI path, long blockSizeBytes, boolean recursive)
} }
} }


// This function should only be called from within synchronized (mInodeTree) blocks.
InodeTree.CreatePathResult createFileInternal(TachyonURI path, long blockSizeBytes, InodeTree.CreatePathResult createFileInternal(TachyonURI path, long blockSizeBytes,
boolean recursive, long opTimeMs) throws InvalidPathException, FileAlreadyExistException, boolean recursive, long opTimeMs) throws InvalidPathException, FileAlreadyExistException,
BlockInfoException { BlockInfoException {
Expand All @@ -505,7 +505,7 @@ InodeTree.CreatePathResult createFileInternal(TachyonURI path, long blockSizeByt
* @throws FileDoesNotExistException * @throws FileDoesNotExistException
*/ */
public long getNewBlockIdForFile(long fileId) throws FileDoesNotExistException { public long getNewBlockIdForFile(long fileId) throws FileDoesNotExistException {
Inode inode = null; Inode inode;
synchronized (mInodeTree) { synchronized (mInodeTree) {
inode = mInodeTree.getInodeById(fileId); inode = mInodeTree.getInodeById(fileId);
} }
Expand Down Expand Up @@ -570,9 +570,12 @@ private void deleteFileFromEntry(DeleteFileEntry entry) {
boolean deleteFileInternal(long fileId, boolean recursive, long opTimeMs) boolean deleteFileInternal(long fileId, boolean recursive, long opTimeMs)
throws TachyonException, FileDoesNotExistException { throws TachyonException, FileDoesNotExistException {
Inode inode = mInodeTree.getInodeById(fileId); Inode inode = mInodeTree.getInodeById(fileId);
return deleteInodeInternal(inode, recursive, opTimeMs); synchronized (mInodeTree) {
return deleteInodeInternal(inode, recursive, opTimeMs);
}
} }


// This function should only be called from within synchronized (mInodeTree) blocks.
private boolean deleteInodeInternal(Inode inode, boolean recursive, long opTimeMs) private boolean deleteInodeInternal(Inode inode, boolean recursive, long opTimeMs)
throws TachyonException, FileDoesNotExistException { throws TachyonException, FileDoesNotExistException {
if (inode == null) { if (inode == null) {
Expand Down Expand Up @@ -691,7 +694,9 @@ public List<FileBlockInfo> getFileBlockInfoList(long fileId) throws FileDoesNotE
public List<FileBlockInfo> getFileBlockInfoList(TachyonURI path) public List<FileBlockInfo> getFileBlockInfoList(TachyonURI path)
throws FileDoesNotExistException, InvalidPathException { throws FileDoesNotExistException, InvalidPathException {
long fileId = getFileId(path); long fileId = getFileId(path);
return getFileBlockInfoList(fileId); synchronized (mInodeTree) {
return getFileBlockInfoList(fileId);
}
} }


/** /**
Expand All @@ -702,6 +707,7 @@ public List<FileBlockInfo> getFileBlockInfoList(TachyonURI path)
* @param blockInfo the {@link BlockInfo} to generate the {@link FileBlockInfo} from * @param blockInfo the {@link BlockInfo} to generate the {@link FileBlockInfo} from
* @return a new {@link FileBlockInfo} for the block * @return a new {@link FileBlockInfo} for the block
*/ */
// This function should only be called from within synchronized (mInodeTree) blocks.
private FileBlockInfo generateFileBlockInfo(InodeFile file, BlockInfo blockInfo) { private FileBlockInfo generateFileBlockInfo(InodeFile file, BlockInfo blockInfo) {
FileBlockInfo fileBlockInfo = new FileBlockInfo(); FileBlockInfo fileBlockInfo = new FileBlockInfo();


Expand All @@ -716,7 +722,7 @@ private FileBlockInfo generateFileBlockInfo(InodeFile file, BlockInfo blockInfo)
// locations from the under storage system. // locations from the under storage system.
String ufsPath = mMountTable.resolve(mInodeTree.getPath(file)).toString(); String ufsPath = mMountTable.resolve(mInodeTree.getPath(file)).toString();
UnderFileSystem ufs = UnderFileSystem.get(ufsPath, MasterContext.getConf()); UnderFileSystem ufs = UnderFileSystem.get(ufsPath, MasterContext.getConf());
List<String> locs = null; List<String> locs;
try { try {
locs = ufs.getFileLocations(ufsPath, fileBlockInfo.offset); locs = ufs.getFileLocations(ufsPath, fileBlockInfo.offset);
} catch (IOException e) { } catch (IOException e) {
Expand Down Expand Up @@ -905,7 +911,7 @@ public boolean rename(long fileId, TachyonURI dstPath)
String[] dstComponents = PathUtils.getPathComponents(dstPath.toString()); String[] dstComponents = PathUtils.getPathComponents(dstPath.toString());
if (srcComponents.length < dstComponents.length) { if (srcComponents.length < dstComponents.length) {
boolean isPrefix = true; boolean isPrefix = true;
for (int prefixInd = 0; prefixInd < srcComponents.length; prefixInd ++) { for (int prefixInd = 0; prefixInd < srcComponents.length; prefixInd++) {
if (!srcComponents[prefixInd].equals(dstComponents[prefixInd])) { if (!srcComponents[prefixInd].equals(dstComponents[prefixInd])) {
isPrefix = false; isPrefix = false;
break; break;
Expand Down Expand Up @@ -941,7 +947,7 @@ public boolean rename(long fileId, TachyonURI dstPath)
renameInternal(fileId, dstPath, opTimeMs); renameInternal(fileId, dstPath, opTimeMs);


// If the source file is persisted, rename it in the UFS. // If the source file is persisted, rename it in the UFS.
FileInfo fileInfo = getFileInfo(srcInode); FileInfo fileInfo = getFileInfoInternal(srcInode);
if (fileInfo.isPersisted) { if (fileInfo.isPersisted) {
TachyonURI ufsSrcPath = mMountTable.resolve(srcPath); TachyonURI ufsSrcPath = mMountTable.resolve(srcPath);
TachyonURI ufsDstPath = mMountTable.resolve(dstPath); TachyonURI ufsDstPath = mMountTable.resolve(dstPath);
Expand All @@ -966,6 +972,7 @@ public boolean rename(long fileId, TachyonURI dstPath)
} }
} }


// This function should only be called from within synchronized (mInodeTree) blocks.
void renameInternal(long fileId, TachyonURI dstPath, long opTimeMs) throws InvalidPathException, void renameInternal(long fileId, TachyonURI dstPath, long opTimeMs) throws InvalidPathException,
FileDoesNotExistException { FileDoesNotExistException {
Inode srcInode = mInodeTree.getInodeById(fileId); Inode srcInode = mInodeTree.getInodeById(fileId);
Expand Down Expand Up @@ -1008,6 +1015,7 @@ public void setPinned(long fileId, boolean pinned) throws FileDoesNotExistExcept
} }
} }


// This function should only be called from within synchronized (mInodeTree) blocks.
private void setPinnedInternal(long fileId, boolean pinned, long opTimeMs) private void setPinnedInternal(long fileId, boolean pinned, long opTimeMs)
throws FileDoesNotExistException { throws FileDoesNotExistException {
Inode inode = mInodeTree.getInodeById(fileId); Inode inode = mInodeTree.getInodeById(fileId);
Expand Down Expand Up @@ -1070,7 +1078,9 @@ public boolean free(long fileId, boolean recursive) throws FileDoesNotExistExcep
* @throws FileDoesNotExistException raise if the file does not exist. * @throws FileDoesNotExistException raise if the file does not exist.
*/ */
public TachyonURI getPath(long fileId) throws FileDoesNotExistException { public TachyonURI getPath(long fileId) throws FileDoesNotExistException {
return mInodeTree.getPath(mInodeTree.getInodeById(fileId)); synchronized (mInodeTree) {
return mInodeTree.getPath(mInodeTree.getInodeById(fileId));
}
} }


/** /**
Expand Down Expand Up @@ -1112,51 +1122,50 @@ public DependencyInfo getClientDependencyInfo(int dependencyId)
} }


public void requestFilesInDependency(int dependencyId) { public void requestFilesInDependency(int dependencyId) {
synchronized (mDependencyMap) { Dependency dependency = mDependencyMap.getFromDependencyId(dependencyId);
Dependency dependency = mDependencyMap.getFromDependencyId(dependencyId); if (dependency != null) {
if (dependency != null) { LOG.info("Request files in dependency " + dependency);
LOG.info("Request files in dependency " + dependency); if (dependency.hasLostFile()) {
if (dependency.hasLostFile()) { mDependencyMap.recomputeDependency(dependencyId);
mDependencyMap.recomputeDependency(dependencyId);
}
} else {
LOG.error("There is no dependency with id " + dependencyId);
} }
} else {
LOG.error("There is no dependency with id " + dependencyId);
} }
} }


public void reportLostFile(long fileId) throws FileDoesNotExistException { public void reportLostFile(long fileId) throws FileDoesNotExistException {
InodeFile iFile;
synchronized (mInodeTree) { synchronized (mInodeTree) {
Inode inode = mInodeTree.getInodeById(fileId); Inode inode = mInodeTree.getInodeById(fileId);
if (inode.isDirectory()) { if (inode.isDirectory()) {
LOG.warn("Reported file is a directory " + inode); LOG.warn("Reported file is a directory " + inode);
return; return;
} }
InodeFile iFile = (InodeFile) inode; iFile = (InodeFile) inode;

}
if (mDependencyMap.addLostFile(fileId) == null) { if (mDependencyMap.addLostFile(fileId) == null) {
LOG.error("There is no dependency info for " + iFile + " . No recovery on that"); LOG.error("There is no dependency info for " + iFile + " . No recovery on that");
} else { } else {
LOG.info("Reported file loss. Tachyon will recompute it: " + iFile); LOG.info("Reported file loss. Tachyon will recompute it: " + iFile);
}
} }
} }


public List<Integer> getPriorityDependencyList() { public List<Integer> getPriorityDependencyList() {
synchronized (mDependencyMap) { return mDependencyMap.getPriorityDependencyList();
return mDependencyMap.getPriorityDependencyList();
}
} }


public long loadFileInfoFromUfs(TachyonURI path, boolean recursive) throws TachyonException { public long loadFileInfoFromUfs(TachyonURI path, boolean recursive) throws TachyonException {
TachyonURI ufsPath = mMountTable.resolve(path); TachyonURI ufsPath;
synchronized (mInodeTree) {
ufsPath = mMountTable.resolve(path);
}
UnderFileSystem underfs = UnderFileSystem.get(ufsPath.toString(), MasterContext.getConf()); UnderFileSystem underfs = UnderFileSystem.get(ufsPath.toString(), MasterContext.getConf());
try { try {
long ufsBlockSizeByte = underfs.getBlockSizeByte(ufsPath.toString()); long ufsBlockSizeByte = underfs.getBlockSizeByte(ufsPath.toString());
long fileSizeByte = underfs.getFileSize(ufsPath.toString()); long fileSizeByte = underfs.getFileSize(ufsPath.toString());
long fileId = createFile(path, ufsBlockSizeByte, recursive); long fileId = createFile(path, ufsBlockSizeByte, recursive);
if (fileId != -1) { if (fileId != -1) {
completeFileCheckpoint(-1, fileId, fileSizeByte, ufsPath); addCheckpoint(-1, fileId, fileSizeByte, ufsPath);
} }
return fileId; return fileId;
} catch (BlockInfoException e) { } catch (BlockInfoException e) {
Expand All @@ -1183,22 +1192,25 @@ public long loadFileInfoFromUfs(TachyonURI path, boolean recursive) throws Tachy
public boolean mount(TachyonURI tachyonPath, TachyonURI ufsPath) public boolean mount(TachyonURI tachyonPath, TachyonURI ufsPath)
throws FileAlreadyExistException, InvalidPathException { throws FileAlreadyExistException, InvalidPathException {
mkdirs(tachyonPath, true); mkdirs(tachyonPath, true);
if (mMountTable.add(tachyonPath, ufsPath)) { synchronized (mInodeTree) {
writeJournalEntry(new AddMountPointEntry(tachyonPath, ufsPath)); if (mMountTable.add(tachyonPath, ufsPath)) {
flushJournal(); writeJournalEntry(new AddMountPointEntry(tachyonPath, ufsPath));
return true; flushJournal();
return true;
}
} }
return false; return false;
} }


// TODO(jiri): Account for asynchronously persisted files once lineage is implemented.
public boolean unmount(TachyonURI tachyonPath) throws FileDoesNotExistException, public boolean unmount(TachyonURI tachyonPath) throws FileDoesNotExistException,
InvalidPathException { InvalidPathException {
if (mMountTable.delete(tachyonPath)) { synchronized (mInodeTree) {
// TODO(jiri): Delete metadata for the Tachyon namespace nested under tachyonPath. if (mMountTable.delete(tachyonPath)) {
writeJournalEntry(new DeleteMountPointEntry(tachyonPath)); // TODO(jiri): Delete metadata for the Tachyon namespace nested under tachyonPath.
flushJournal(); writeJournalEntry(new DeleteMountPointEntry(tachyonPath));
return true; flushJournal();
return true;
}
} }
return false; return false;
} }
Expand Down
Expand Up @@ -58,7 +58,7 @@ public List<Integer> workerGetPriorityDependencyList() {
@Override @Override
public boolean addCheckpoint(long workerId, long fileId, long length, String checkpointPath) public boolean addCheckpoint(long workerId, long fileId, long length, String checkpointPath)
throws BlockInfoException, FileDoesNotExistException, SuspectedFileSizeException { throws BlockInfoException, FileDoesNotExistException, SuspectedFileSizeException {
return mFileSystemMaster.completeFileCheckpoint(workerId, fileId, length, new TachyonURI( return mFileSystemMaster.addCheckpoint(workerId, fileId, length, new TachyonURI(
checkpointPath)); checkpointPath));
} }


Expand Down

0 comments on commit 2a3bb1a

Please sign in to comment.