Skip to content

Commit

Permalink
[ALLUXIO-2743] Allow UnderFileSystem to be keyed by mountId
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Apr 26, 2017
1 parent 3ab59a0 commit 2647bd3
Show file tree
Hide file tree
Showing 21 changed files with 100 additions and 79 deletions.
Expand Up @@ -72,7 +72,7 @@ private OutStreamOptions() {
mOwner = SecurityUtils.getOwnerFromLoginModule();
mGroup = SecurityUtils.getGroupFromLoginModule();
mMode = Mode.defaults().applyFileUMask();
mMountId = IdUtils.INVALID_UFS_ID;
mMountId = IdUtils.INVALID_MOUNT_ID;
}

/**
Expand Down
91 changes: 62 additions & 29 deletions core/common/src/main/java/alluxio/underfs/UnderFileSystem.java
Expand Up @@ -20,6 +20,7 @@
import alluxio.underfs.options.ListOptions;
import alluxio.underfs.options.MkdirsOptions;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -61,36 +62,61 @@ private static final class Cache {
*/
private final ConcurrentHashMap<Key, UnderFileSystem> mUnderFileSystemMap =
new ConcurrentHashMap<>();
/**
* Maps from mount id to {@link UnderFileSystem} instances.
*/
private final ConcurrentHashMap<Long, UnderFileSystem> mIdToUnderFileSystemMap =
new ConcurrentHashMap<>();

private Cache() {}

/**
* Gets a UFS instance from the cache if exists. Otherwise, creates a new instance and adds
* that to the cache.
*
* @param path the ufs path
* @param ufsConf the ufs configuration
* @param path the UFS path
* @param ufsConf the UFS configuration
* @param mountId the mount id, IdUtils.INVALID_MOUNT_ID if there is no mount associated
* @return the UFS instance
*/
UnderFileSystem get(String path, Map<String, String> ufsConf) {
UnderFileSystem getOrAdd(String path, Map<String, String> ufsConf, long mountId) {
Key key = new Key(new AlluxioURI(path), ufsConf);
UnderFileSystem cachedFs = mUnderFileSystemMap.get(key);
if (cachedFs != null) {
if (mountId != IdUtils.INVALID_MOUNT_ID) {
mIdToUnderFileSystemMap.put(mountId, cachedFs);
}
return cachedFs;
}
UnderFileSystem fs = UnderFileSystemRegistry.create(path, ufsConf);
cachedFs = mUnderFileSystemMap.putIfAbsent(key, fs);
if (cachedFs == null) {
if (mountId != IdUtils.INVALID_MOUNT_ID) {
mIdToUnderFileSystemMap.put(mountId, fs);
}
return fs;
}
try {
fs.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
if (mountId != IdUtils.INVALID_MOUNT_ID) {
mIdToUnderFileSystemMap.put(mountId, cachedFs);
}
return cachedFs;
}

/**
* Gets a UFS instance from the cache if exists. Otherwise, returns null.
*
* @param mountId the mount id
* @return the UFS instance
*/
UnderFileSystem get(long mountId) {
return mIdToUnderFileSystemMap.get(mountId);
}

void clear() {
mUnderFileSystemMap.clear();
}
Expand Down Expand Up @@ -135,8 +161,8 @@ public boolean equals(Object object) {
public String toString() {
return Objects.toStringHelper(this)
.add("authority", mAuthority)
.add("property", mProperties)
.add("scheme", mScheme)
.add("property", mProperties)
.toString();
}
}
Expand All @@ -149,27 +175,50 @@ public static void clearCache() {
}

/**
* Gets the {@link UnderFileSystem} instance according to its schema without specific ufs conf.
* Gets the {@link UnderFileSystem} instance according to its UFS path. This method should only
* be used for journal operations and tests.
*
* @param path the file path storing over the ufs
* @return instance of the under layer file system
*/
// TODO(binfan): Remove this method, currently it is only used in tests
public static UnderFileSystem get(String path) {
return get(path, null);
return UFS_CACHE.getOrAdd(path, null, IdUtils.INVALID_MOUNT_ID);
}

/**
* Gets the {@link UnderFileSystem} instance according to its scheme and configuration.
* Gets the {@link UnderFileSystem} instance according to its UFS path. This method should only
* be used for journal operations and tests.
*
* @param path journal path in ufs
* @return the instance of under file system for Alluxio journal directory
*/
public static UnderFileSystem get(URI path) {
return UFS_CACHE.getOrAdd(path.toString(), null, IdUtils.INVALID_MOUNT_ID);
}

/**
* Gets the {@link UnderFileSystem} instance according to its scheme and configuration. This
* method should only be used when a new mount is added or detected.
*
* @param path the path of mount point
* @param ufsConf the configuration object for ufs only
* @param mountId the id of mount point
* @return instance of the under layer file system
*/
public static UnderFileSystem get(String path, Map<String, String> ufsConf) {
Preconditions.checkArgument(path != null, "path may not be null");
public static UnderFileSystem get(String path, Map<String, String> ufsConf, long mountId) {
Preconditions.checkArgument(path != null, "path");

return UFS_CACHE.get(path, ufsConf);
return UFS_CACHE.getOrAdd(path, ufsConf, mountId);
}

/**
* Gets the {@link UnderFileSystem} instance according to its scheme and configuration.
*
* @param mountId the id of mount point
* @return instance of the under layer file system
*/
public static UnderFileSystem get(long mountId) {
return UFS_CACHE.get(mountId);
}

/**
Expand All @@ -179,23 +228,7 @@ public static UnderFileSystem getForRoot() {
String ufsRoot = Configuration.get(PropertyKey.MASTER_MOUNT_TABLE_ROOT_UFS);
Map<String, String> ufsConf = Configuration.getNestedProperties(
PropertyKey.MASTER_MOUNT_TABLE_ROOT_OPTION);
return get(ufsRoot, ufsConf);
}

/**
* @param path journal path in ufs
* @return the instance of under file system for Alluxio journal directory
*/
public static UnderFileSystem getForJournal(URI path) {
return getForJournal(path.toString());
}

/**
* @param path journal path in ufs
* @return the instance of under file system for Alluxio journal directory
*/
public static UnderFileSystem getForJournal(String path) {
return get(path, null);
return get(ufsRoot, ufsConf, IdUtils.INVALID_MOUNT_ID);
}
}

Expand Down Expand Up @@ -243,7 +276,7 @@ public int getValue() {
void close() throws IOException;

/**
* Configures and updates the properties. For instance, this method can add new properties or
* Configures and updates the properties. For instance, this method can getOrAdd new properties or
* modify existing properties specified through {@link #setProperties(Map)}.
*
* The default implementation is a no-op. This should be overridden if a subclass needs
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/main/java/alluxio/util/IdUtils.java
Expand Up @@ -30,7 +30,7 @@ public final class IdUtils {

public static final long INVALID_FILE_ID = -1;
public static final long INVALID_WORKER_ID = -1;
public static final long INVALID_UFS_ID = -1;
public static final long INVALID_MOUNT_ID = -1;
private static SecureRandom sRandom = new SecureRandom();

private IdUtils() {} // prevent instantiation
Expand Down
Expand Up @@ -87,7 +87,7 @@ private Upgrader(String master) {
mJournalV1 = (new Journal.Factory(
getJournalLocation(Configuration.get(PropertyKey.MASTER_JOURNAL_FOLDER)))).create(master);

mUfs = UnderFileSystem.Factory.getForJournal(sJournalDirectoryV0);
mUfs = UnderFileSystem.Factory.get(sJournalDirectoryV0);

mCheckpointV0 = URIUtils.appendPathOrDie(mJournalV0.getLocation(), "checkpoint.data");
mCompletedLogsV0 = URIUtils.appendPathOrDie(mJournalV0.getLocation(), "completed");
Expand Down
Expand Up @@ -81,7 +81,7 @@ public class UfsJournal implements Journal {
* @param location the location for this journal
*/
public UfsJournal(URI location) {
this(location, UnderFileSystem.Factory.getForJournal(location));
this(location, UnderFileSystem.Factory.get(location));
}

/**
Expand Down Expand Up @@ -125,7 +125,7 @@ public long getNextSequenceNumberToCheckpoint() throws IOException {

@Override
public boolean isFormatted() throws IOException {
UnderFileSystem ufs = UnderFileSystem.Factory.getForJournal(mLocation);
UnderFileSystem ufs = UnderFileSystem.Factory.get(mLocation);
UnderFileStatus[] files = ufs.listStatus(mLocation.toString());
if (files == null) {
return false;
Expand Down
Expand Up @@ -50,7 +50,7 @@ public final class UfsJournalFileParser implements JournalFileParser {
*/
public UfsJournalFileParser(URI location) {
mLocation = Preconditions.checkNotNull(location);
mUfs = UnderFileSystem.Factory.getForJournal(mLocation);
mUfs = UnderFileSystem.Factory.get(mLocation);
}

@Override
Expand Down
Expand Up @@ -129,7 +129,7 @@ public JournalReader getReader() {

@Override
public boolean isFormatted() throws IOException {
UnderFileSystem ufs = UnderFileSystem.Factory.getForJournal(mLocation);
UnderFileSystem ufs = UnderFileSystem.Factory.get(mLocation);
UnderFileStatus[] files = ufs.listStatus(mLocation.toString());
if (files == null) {
return false;
Expand Down
Expand Up @@ -53,7 +53,7 @@ public class UfsJournalReader implements JournalReader {
*/
UfsJournalReader(UfsJournal journal) {
mJournal = Preconditions.checkNotNull(journal, "journal");
mUfs = UnderFileSystem.Factory.getForJournal(mJournal.getLocation());
mUfs = UnderFileSystem.Factory.get(mJournal.getLocation());
mCheckpoint = mJournal.getCheckpoint();
}

Expand Down
Expand Up @@ -81,7 +81,7 @@ public final class UfsJournalWriter implements JournalWriter {
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
mUfs = UnderFileSystem.Factory.getForJournal(mJournal.getLocation());
mUfs = UnderFileSystem.Factory.get(mJournal.getLocation());
mCheckpointManager = new UfsCheckpointManager(mUfs, mJournal.getCheckpoint(), this);
}

Expand Down
Expand Up @@ -47,7 +47,7 @@ public UfsMutableJournal(URI location) {
@Override
public void format() throws IOException {
LOG.info("Formatting {}", mLocation);
UnderFileSystem ufs = UnderFileSystem.Factory.getForJournal(mLocation);
UnderFileSystem ufs = UnderFileSystem.Factory.get(mLocation);
if (ufs.isDirectory(mLocation.toString())) {
for (UnderFileStatus p : ufs.listStatus(mLocation.toString())) {
URI childPath;
Expand Down
Expand Up @@ -2234,9 +2234,10 @@ private void mountInternal(LockedInodePath inodePath, AlluxioURI ufsPath, long m
AlluxioURI alluxioPath = inodePath.getUri();

if (!replayed) {
// Check that the ufsPath exists and is a directory
// Ensure the UFS is added into factory with mountId
UnderFileSystem ufs =
UnderFileSystem.Factory.get(ufsPath.toString(), options.getProperties());
UnderFileSystem.Factory.get(ufsPath.toString(), options.getProperties(), mountId);
// Check that the ufsPath exists and is a directory
if (!ufs.isDirectory(ufsPath.toString())) {
throw new IOException(
ExceptionMessage.UFS_PATH_DOES_NOT_EXIST.getMessage(ufsPath.getPath()));
Expand Down
Expand Up @@ -278,8 +278,7 @@ public Resolution resolve(AlluxioURI uri) throws InvalidPathException {
MountInfo info = mMountTable.get(mountPoint);
AlluxioURI ufsUri = info.getUfsUri();
// TODO(gpang): this ufs should probably be cached.
UnderFileSystem ufs = UnderFileSystem.Factory
.get(ufsUri.toString(), info.getOptions().getProperties());
UnderFileSystem ufs = UnderFileSystem.Factory.get(info.getMountId());
AlluxioURI resolvedUri = ufs.resolveUri(ufsUri, path.substring(mountPoint.length()));
return new Resolution(resolvedUri, ufs, info.getOptions().isShared(), info.getMountId());
}
Expand Down
Expand Up @@ -70,8 +70,7 @@ public long getMountId() {
*/
public MountPointInfo toMountPointInfo() {
MountPointInfo info = new MountPointInfo();
UnderFileSystem ufs =
UnderFileSystem.Factory.get(mUfsUri.toString(), getOptions().getProperties());
UnderFileSystem ufs = UnderFileSystem.Factory.get(mMountId);
info.setUfsUri(mUfsUri.toString());
info.setUfsType(ufs.getUnderFSType());
try {
Expand Down
Expand Up @@ -212,7 +212,7 @@ public void writableMount() throws Exception {
String mountPath = "/mnt/foo";
AlluxioURI alluxioUri = new AlluxioURI("alluxio://localhost:1234" + mountPath);
mMountTable
.add(alluxioUri, new AlluxioURI("hdfs://localhost:5678/foo"), IdUtils.INVALID_UFS_ID,
.add(alluxioUri, new AlluxioURI("hdfs://localhost:5678/foo"), IdUtils.INVALID_MOUNT_ID,
MountOptions.defaults());

try {
Expand Down
Expand Up @@ -21,10 +21,6 @@
import com.google.common.io.Closer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.concurrent.GuardedBy;

/**
* The default implementation of UfsManager to manage the ufs used by different worker services.
Expand All @@ -33,40 +29,33 @@ public final class DefaultUfsManager implements UfsManager {

private final Object mLock = new Object();

/** Map from Alluxio mount point to the corresponding ufs configuration. */
@GuardedBy("mLock")
private final Map<Long, UnderFileSystem> mUfsMap;

private final FileSystemMasterClient mMasterClient;
private final Closer mCloser;

/**
* Constructs an instance of {@link DefaultUfsManager}.
*/
public DefaultUfsManager() {
mUfsMap = new HashMap<>();
mCloser = Closer.create();
mCloser = Closer.create();
mMasterClient = mCloser.register(new FileSystemMasterClient(
NetworkAddressUtils.getConnectAddress(NetworkAddressUtils.ServiceType.MASTER_RPC)));
}

@Override
public UnderFileSystem getUfsByMountId(long mountId) throws IOException {
synchronized (mLock) {
if (!mUfsMap.containsKey(mountId)) {
UfsInfo info;
try {
info = mMasterClient.getUfsInfo(mountId);
} catch (AlluxioException e) {
throw new IOException(e);
}
Preconditions.checkState((info.isSetUri() && info.isSetProperties()));
UnderFileSystem ufs = UnderFileSystem.Factory.get(info.getUri(), info.getProperties());
mUfsMap.put(mountId, ufs);
mCloser.register(ufs);
UnderFileSystem ufs = UnderFileSystem.Factory.get(mountId);
if (ufs == null) {
UfsInfo info;
try {
info = mMasterClient.getUfsInfo(mountId);
} catch (AlluxioException e) {
throw new IOException(e);
}
return mUfsMap.get(mountId);
Preconditions.checkState((info.isSetUri() && info.isSetProperties()));
ufs = UnderFileSystem.Factory.get(info.getUri(), info.getProperties(), mountId);
mCloser.register(ufs);
}
return ufs;
}

@Override
Expand Down
Expand Up @@ -130,7 +130,7 @@ protected void checkFileInAlluxio(AlluxioURI filePath, int fileLen) throws Excep
protected void checkFileInUnderStorage(AlluxioURI filePath, int fileLen) throws Exception {
URIStatus status = mFileSystem.getStatus(filePath);
String checkpointPath = status.getUfsPath();
UnderFileSystem ufs = UnderFileSystem.Factory.getForJournal(checkpointPath);
UnderFileSystem ufs = UnderFileSystem.Factory.get(checkpointPath);

try (InputStream is = ufs.open(checkpointPath)) {
byte[] res = new byte[(int) status.getLength()];
Expand Down
Expand Up @@ -50,7 +50,7 @@ public final class JournalCheckpointThreadTest {
public void before() throws Exception {
URI location = URIUtils
.appendPathOrDie(new URI(mFolder.newFolder().getAbsolutePath()), "FileSystemMaster");
mUfs = Mockito.spy(UnderFileSystem.Factory.getForJournal(location));
mUfs = Mockito.spy(UnderFileSystem.Factory.get(location));
mJournal = new UfsJournal(location, mUfs);
}

Expand Down
Expand Up @@ -42,7 +42,7 @@ public final class UfsJournalCheckpointWriterTest {
public void before() throws Exception {
URI location = URIUtils
.appendPathOrDie(new URI(mFolder.newFolder().getAbsolutePath()), "FileSystemMaster");
mUfs = Mockito.spy(UnderFileSystem.Factory.getForJournal(location));
mUfs = Mockito.spy(UnderFileSystem.Factory.get(location));
mJournal = new UfsJournal(location, mUfs);
}

Expand Down

0 comments on commit 2647bd3

Please sign in to comment.