Skip to content

Commit

Permalink
Using TachyonURI and creating directories on mount.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsimsa committed Sep 16, 2015
1 parent 7d54019 commit 07d4194
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 64 deletions.
3 changes: 1 addition & 2 deletions clients/unshaded/src/main/java/tachyon/client/TachyonFS.java
Expand Up @@ -342,8 +342,7 @@ public synchronized long createFile(TachyonURI path, TachyonURI ufsPath, long bl
if (blockSizeByte > 0) { if (blockSizeByte > 0) {
return mFSMasterClient.createFile(path.getPath(), blockSizeByte, recursive); return mFSMasterClient.createFile(path.getPath(), blockSizeByte, recursive);
} else { } else {
return mFSMasterClient.loadFileInfoFromUfs(path.getPath(), ufsPath.toString(), return mFSMasterClient.loadFileFromUfs(path.getPath(), recursive);
blockSizeByte, recursive);
} }
} catch (TException e) { } catch (TException e) {
throw new IOException(e); throw new IOException(e);
Expand Down
Expand Up @@ -202,21 +202,19 @@ public List<FileInfo> listStatus(TachyonFile file) throws IOException, FileDoesN
* updated and no data will be transferred. The data can be added to Tachyon space by doing an * updated and no data will be transferred. The data can be added to Tachyon space by doing an
* operation with the cache option specified, for example reading. * operation with the cache option specified, for example reading.
* *
* @param path the path to create the file in Tachyon * @param path the Tachyon path of the file to load
* @param ufsPath the under storage system path of the file that will back the Tachyon file
* @param recursive if true, the parent directories to the file in Tachyon will be created * @param recursive if true, the parent directories to the file in Tachyon will be created
* @return the file id of the resulting file in Tachyon * @return the file id of the resulting file in Tachyon
* @throws FileDoesNotExistException if there is no file at the given path * @throws FileDoesNotExistException if there is no file at the given path
* @throws IOException if the Tachyon path is invalid or the ufsPath does not exist * @throws IOException if the Tachyon path is invalid or the ufsPath does not exist
*/ */
public long loadFileInfoFromUfs(TachyonURI path, TachyonURI ufsPath, boolean recursive) public long loadFileFromUfs(TachyonURI path, boolean recursive)
throws IOException, FileDoesNotExistException { throws IOException, FileDoesNotExistException {
FileSystemMasterClient masterClient = mContext.acquireMasterClient(); FileSystemMasterClient masterClient = mContext.acquireMasterClient();
try { try {
long fileId = long fileId =
masterClient.loadFileInfoFromUfs(path.getPath(), ufsPath.toString(), -1L, recursive); masterClient.loadFileFromUfs(path.getPath(), recursive);
LOG.info( LOG.info("Loaded file " + path.getPath() + (recursive ? " recursively" : ""));
"Loaded file " + path.getPath() + " from " + ufsPath + (recursive ? " recursively" : ""));
return fileId; return fileId;
} finally { } finally {
mContext.releaseMasterClient(masterClient); mContext.releaseMasterClient(masterClient);
Expand Down
Expand Up @@ -270,20 +270,19 @@ public synchronized long createFile(String path, long blockSizeBytes, boolean re
/** /**
* Loads a file from the under file system. * Loads a file from the under file system.
* *
* @param path the file path * @param path the Tachyon path of the file
* @param ufsPath the under file system path
* @param recursive whether parent directories should be loaded if not present yet * @param recursive whether parent directories should be loaded if not present yet
* @return the file id * @return the file id
* @throws FileDoesNotExistException if the file does not exist * @throws FileDoesNotExistException if the file does not exist
* @throws IOException if an I/O error occurs * @throws IOException if an I/O error occurs
*/ */
public synchronized long loadFileInfoFromUfs(String path, String ufsPath, boolean recursive) public synchronized long loadFileFromUfs(String path, boolean recursive)
throws IOException, FileDoesNotExistException { throws IOException, FileDoesNotExistException {
int retry = 0; int retry = 0;
while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) { while (!mClosed && (retry ++) <= RPC_MAX_NUM_RETRY) {
connect(); connect();
try { try {
return mClient.loadFileFromUfs(ufsPath, recursive); return mClient.loadFileFromUfs(path, recursive);
} catch (FileDoesNotExistException e) { } catch (FileDoesNotExistException e) {
throw e; throw e;
} catch (TException e) { } catch (TException e) {
Expand Down
Expand Up @@ -114,7 +114,7 @@ public void AddCheckpointTest() throws Exception {
FileInfo fInfo = mTfs.getInfo(mTfs.open(new TachyonURI("/xyz"))); FileInfo fInfo = mTfs.getInfo(mTfs.open(new TachyonURI("/xyz")));
TachyonURI ckPath = new TachyonURI("/xyz_ck"); TachyonURI ckPath = new TachyonURI("/xyz_ck");
// TODO(cc): What's the counterpart in the new client API for this? // TODO(cc): What's the counterpart in the new client API for this?
mTfs.loadFileInfoFromUfs(new TachyonURI("/xyz_ck"), new TachyonURI(fInfo.getUfsPath()), true); mTfs.loadFileFromUfs(new TachyonURI("/xyz_ck"), true);
FileInfo ckFileInfo = mTfs.getInfo(mTfs.open(ckPath)); FileInfo ckFileInfo = mTfs.getInfo(mTfs.open(ckPath));
mLocalTachyonCluster.stopTFS(); mLocalTachyonCluster.stopTFS();
AddCheckpointTestUtil(fInfo, ckFileInfo); AddCheckpointTestUtil(fInfo, ckFileInfo);
Expand Down
22 changes: 11 additions & 11 deletions servers/src/main/java/tachyon/master/file/FileSystemMaster.java
Expand Up @@ -1091,16 +1091,13 @@ public List<Integer> getPriorityDependencyList() {
} }
} }


public long loadFileFromUfs(String ufsPath, boolean recursive) throws TachyonException { public long loadFileFromUfs(TachyonURI tachyonPath, boolean recursive) throws TachyonException {
if (ufsPath == null || ufsPath.isEmpty()) { String ufsPath = mMountTable.lookup(tachyonPath).toString();
throw new IllegalArgumentException("the underFS path is not provided");
}
UnderFileSystem underfs = UnderFileSystem.get(ufsPath, MasterContext.getConf()); UnderFileSystem underfs = UnderFileSystem.get(ufsPath, MasterContext.getConf());
try { try {
long ufsBlockSizeByte = underfs.getBlockSizeByte(ufsPath); long ufsBlockSizeByte = underfs.getBlockSizeByte(ufsPath);
long fileSizeByte = underfs.getFileSize(ufsPath); long fileSizeByte = underfs.getFileSize(ufsPath);
String path = mMountTable.reverseLookup(ufsPath); long fileId = createFile(tachyonPath, ufsBlockSizeByte, recursive);
long fileId = createFile(new TachyonURI(path), ufsBlockSizeByte, recursive);
if (fileId != -1) { if (fileId != -1) {
completeFileCheckpoint(-1, fileId, fileSizeByte, new TachyonURI(ufsPath)); completeFileCheckpoint(-1, fileId, fileSizeByte, new TachyonURI(ufsPath));
} }
Expand All @@ -1115,22 +1112,25 @@ public long loadFileFromUfs(String ufsPath, boolean recursive) throws TachyonExc
throw new TachyonException(ipe.getMessage()); throw new TachyonException(ipe.getMessage());
} catch (IOException ioe) { } catch (IOException ioe) {
throw new TachyonException(ioe.getMessage()); throw new TachyonException(ioe.getMessage());
} catch (NotFoundException nfe) {
throw new TachyonException(nfe.getMessage());
} catch (SuspectedFileSizeException sfse) { } catch (SuspectedFileSizeException sfse) {
throw new TachyonException(sfse.getMessage()); throw new TachyonException(sfse.getMessage());
} }
} }


public void mount(String tachyonPath, String ufsPath) throws AlreadyExistsException { public void mount(TachyonURI tachyonPath, TachyonURI ufsPath) throws AlreadyExistsException,
FileAlreadyExistException, InvalidPathException {
mkdirs(tachyonPath, true);
mMountTable.add(tachyonPath, ufsPath); mMountTable.add(tachyonPath, ufsPath);
writeJournalEntry(new AddMountPointEntry(tachyonPath, ufsPath)); writeJournalEntry(new AddMountPointEntry(tachyonPath, ufsPath));
flushJournal(); flushJournal();
} }


public void unmount(String tachyonPath) throws NotFoundException { // TODO(jiri): Account for asynchronously persisted files once lineage is implemented.
// TODO(jiri): Persist files nested under tachyonPath and then void its namespace. public void unmount(TachyonURI tachyonPath) throws FileDoesNotExistException,
InvalidPathException, NotFoundException {
mMountTable.delete(tachyonPath); mMountTable.delete(tachyonPath);
long fileId = getFileId(tachyonPath);
// TODO(jiri): Delete the files here.
writeJournalEntry(new DeleteMountPointEntry(tachyonPath)); writeJournalEntry(new DeleteMountPointEntry(tachyonPath));
flushJournal(); flushJournal();
} }
Expand Down
Expand Up @@ -15,7 +15,6 @@


package tachyon.master.file; package tachyon.master.file;


import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
Expand All @@ -34,7 +33,6 @@
import tachyon.thrift.InvalidPathException; import tachyon.thrift.InvalidPathException;
import tachyon.thrift.SuspectedFileSizeException; import tachyon.thrift.SuspectedFileSizeException;
import tachyon.thrift.TachyonException; import tachyon.thrift.TachyonException;
import tachyon.underfs.UnderFileSystem;


public final class FileSystemMasterServiceHandler implements FileSystemMasterService.Iface { public final class FileSystemMasterServiceHandler implements FileSystemMasterService.Iface {
private final FileSystemMaster mFileSystemMaster; private final FileSystemMaster mFileSystemMaster;
Expand Down Expand Up @@ -170,22 +168,30 @@ public void requestFilesInDependency(int depId) throws DependencyDoesNotExistExc


@Override @Override
public long loadFileFromUfs(String ufsPath, boolean recursive) throws TachyonException { public long loadFileFromUfs(String ufsPath, boolean recursive) throws TachyonException {
return mFileSystemMaster.loadFileFromUfs(ufsPath, recursive); return mFileSystemMaster.loadFileFromUfs(new TachyonURI(ufsPath), recursive);
} }


@Override @Override
public void mount(String tachyonPath, String ufsPath) throws TachyonException { public void mount(String tachyonPath, String ufsPath) throws TachyonException {
try { try {
mFileSystemMaster.mount(tachyonPath, ufsPath); mFileSystemMaster.mount(new TachyonURI(tachyonPath), new TachyonURI(ufsPath));
} catch (AlreadyExistsException aee) { } catch (AlreadyExistsException aee) {
throw new TachyonException(aee.getMessage()); throw new TachyonException(aee.getMessage());
} catch (FileAlreadyExistException faee) {
throw new TachyonException(faee.getMessage());
} catch (InvalidPathException ipe) {
throw new TachyonException(ipe.getMessage());
} }
} }


@Override @Override
public void unmount(String tachyonPath) throws TachyonException { public void unmount(String tachyonPath) throws TachyonException {
try { try {
mFileSystemMaster.unmount(tachyonPath); mFileSystemMaster.unmount(new TachyonURI(tachyonPath));
} catch (FileDoesNotExistException fdnee) {
throw new TachyonException(fdnee.getMessage());
} catch (InvalidPathException ipe) {
throw new TachyonException(ipe.getMessage());
} catch (NotFoundException nfe) { } catch (NotFoundException nfe) {
throw new TachyonException(nfe.getMessage()); throw new TachyonException(nfe.getMessage());
} }
Expand Down
Expand Up @@ -19,24 +19,25 @@


import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import tachyon.TachyonURI;
import tachyon.master.journal.JournalEntry; import tachyon.master.journal.JournalEntry;
import tachyon.master.journal.JournalEntryType; import tachyon.master.journal.JournalEntryType;


public class AddMountPointEntry implements JournalEntry { public class AddMountPointEntry implements JournalEntry {
private final String mTachyonPath; private final String mTachyonPath;
private final String mUfsPath; private final String mUfsPath;


public AddMountPointEntry(String tachyonPath, String ufsPath) { public AddMountPointEntry(TachyonURI tachyonPath, TachyonURI ufsPath) {
mTachyonPath = tachyonPath; mTachyonPath = tachyonPath.toString();
mUfsPath = ufsPath; mUfsPath = ufsPath.toString();
} }


public String getTachyonPath() { public TachyonURI getTachyonPath() {
return mTachyonPath; return new TachyonURI(mTachyonPath);
} }


public String getUfsPath() { public TachyonURI getUfsPath() {
return mUfsPath; return new TachyonURI(mUfsPath);
} }


@Override @Override
Expand Down
Expand Up @@ -19,18 +19,19 @@


import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import tachyon.TachyonURI;
import tachyon.master.journal.JournalEntry; import tachyon.master.journal.JournalEntry;
import tachyon.master.journal.JournalEntryType; import tachyon.master.journal.JournalEntryType;


public class DeleteMountPointEntry implements JournalEntry { public class DeleteMountPointEntry implements JournalEntry {
private final String mTachyonPath; private final String mTachyonPath;


public DeleteMountPointEntry(String tachyonPath) { public DeleteMountPointEntry(TachyonURI tachyonPath) {
mTachyonPath = tachyonPath; mTachyonPath = tachyonPath.toString();
} }


public String getTachyonPath() { public TachyonURI getTachyonPath() {
return mTachyonPath; return new TachyonURI(mTachyonPath);
} }


@Override @Override
Expand Down
34 changes: 13 additions & 21 deletions servers/src/main/java/tachyon/master/file/meta/MountTable.java
Expand Up @@ -18,20 +18,22 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;


import tachyon.TachyonURI;
import tachyon.exception.AlreadyExistsException; import tachyon.exception.AlreadyExistsException;
import tachyon.exception.NotFoundException; import tachyon.exception.NotFoundException;


/** This class is used for keeping track of Tachyon mount points. It is thread safe. */ /** This class is used for keeping track of Tachyon mount points. It is thread safe. */
public class MountTable { public class MountTable {
private Map<String, String> mMountTable; private Map<TachyonURI, TachyonURI> mMountTable;


public MountTable() { public MountTable() {
final int INITIAL_CAPACITY = 10; final int INITIAL_CAPACITY = 10;
mMountTable = new HashMap<String, String>(INITIAL_CAPACITY); mMountTable = new HashMap<TachyonURI, TachyonURI>(INITIAL_CAPACITY);
} }


public synchronized void add(String tachyonPath, String ufsPath) throws AlreadyExistsException { public synchronized void add(TachyonURI tachyonPath, TachyonURI ufsPath)
for (Map.Entry<String, String> entry : mMountTable.entrySet()) { throws AlreadyExistsException {
for (Map.Entry<TachyonURI, TachyonURI> entry : mMountTable.entrySet()) {
if (hasPrefix(tachyonPath, entry.getKey())) { if (hasPrefix(tachyonPath, entry.getKey())) {
// Cannot mount a path under an existing mount point. // Cannot mount a path under an existing mount point.
throw new AlreadyExistsException("Tachyon path " + tachyonPath throw new AlreadyExistsException("Tachyon path " + tachyonPath
Expand All @@ -41,36 +43,26 @@ public synchronized void add(String tachyonPath, String ufsPath) throws AlreadyE
mMountTable.put(tachyonPath, ufsPath); mMountTable.put(tachyonPath, ufsPath);
} }


public synchronized void delete(String tachyonPath) throws NotFoundException { public synchronized void delete(TachyonURI tachyonPath) throws NotFoundException {
if (mMountTable.containsKey(tachyonPath)) { if (mMountTable.containsKey(tachyonPath)) {
mMountTable.remove(tachyonPath); mMountTable.remove(tachyonPath);
} }
// Cannot mount a path under an existing mount point. // Cannot mount a path under an existing mount point.
throw new NotFoundException("Tachyon path " + tachyonPath + " is not a valid mount point."); throw new NotFoundException("Tachyon path " + tachyonPath + " is not a valid mount point.");
} }


public synchronized String lookup(String tachyonPath) { public synchronized TachyonURI lookup(TachyonURI tachyonPath) {
for (Map.Entry<String, String> entry : mMountTable.entrySet()) { for (Map.Entry<TachyonURI, TachyonURI> entry : mMountTable.entrySet()) {
if (hasPrefix(tachyonPath, entry.getKey())) { if (hasPrefix(tachyonPath, entry.getKey())) {
return entry.getValue() + tachyonPath.substring(entry.getKey().length()); return new TachyonURI(entry.getValue()
+ tachyonPath.toString().substring(entry.getKey().toString().length()));
} }
} }
// If the given path is not found in the mount table, the lookup is an identity. // If the given path is not found in the mount table, the lookup is an identity.
return tachyonPath; return tachyonPath;
} }


public synchronized String reverseLookup(String ufsPath) throws NotFoundException { private boolean hasPrefix(TachyonURI path, TachyonURI prefix) {
for (Map.Entry<String, String> entry : mMountTable.entrySet()) { return path.toString().startsWith(prefix.toString());
if (hasPrefix(ufsPath, entry.getValue())) {
return entry.getKey() + ufsPath.substring(entry.getValue().length());
}
}
// If the given path is not found in the mount table, the reverse lookup fails.
throw new NotFoundException("UFS path " + ufsPath + " is not mounted in Tachyon namespace.");
}

private boolean hasPrefix(String path, String prefix) {
// TODO(jiri): Use canonical representation for UFS scheme and authority.
return path.startsWith(prefix);
} }
} }
Expand Up @@ -81,7 +81,7 @@ public final class BlockDataManager {
public BlockDataManager(WorkerSource workerSource, public BlockDataManager(WorkerSource workerSource,
WorkerBlockMasterClient workerBlockMasterClient, WorkerBlockMasterClient workerBlockMasterClient,
WorkerFileSystemMasterClient workerFileSystemMasterClient) throws IOException { WorkerFileSystemMasterClient workerFileSystemMasterClient) throws IOException {
// TODO(jiri): We may not need to assign the conf to a variable // TODO(jiri): We may not need to assign the conf to a variable.
mTachyonConf = WorkerContext.getConf(); mTachyonConf = WorkerContext.getConf();
mHeartbeatReporter = new BlockHeartbeatReporter(); mHeartbeatReporter = new BlockHeartbeatReporter();
mBlockStore = new TieredBlockStore(); mBlockStore = new TieredBlockStore();
Expand Down Expand Up @@ -136,8 +136,8 @@ public void accessBlock(long sessionId, long blockId) throws NotFoundException {
* Add the checkpoint information of a file. The information is from the session * Add the checkpoint information of a file. The information is from the session
* <code>sessionId</code>. * <code>sessionId</code>.
* *
* This method is normally triggered from {@link tachyon.client.FileOutStream#close()} if and only * This method is normally triggered from {@link tachyon.client.file.FileOutStream#close()} if and
* if {@link tachyon.client.WriteType#isThrough()} is true. The current implementation of * only if {@link tachyon.client.WriteType#isThrough()} is true. The current implementation of
* checkpointing is that through {@link tachyon.client.WriteType} operations write to * checkpointing is that through {@link tachyon.client.WriteType} operations write to
* {@link tachyon.underfs.UnderFileSystem} on the client's write path, but under a session temp * {@link tachyon.underfs.UnderFileSystem} on the client's write path, but under a session temp
* directory (temp directory is defined in the worker as {@link #getSessionUfsTmpFolder(long)}). * directory (temp directory is defined in the worker as {@link #getSessionUfsTmpFolder(long)}).
Expand Down

0 comments on commit 07d4194

Please sign in to comment.