Skip to content

Commit

Permalink
[ALLUXIO-3104] Make factory caching optional (#6881)
Browse files Browse the repository at this point in the history
* [ALLUXIO-3104] Make factory caching optional

* [ALLUXIO-3104] Simplify factory registry code

* Update javadocs
  • Loading branch information
gpang authored and apc999 committed Feb 16, 2018
1 parent f60aaa5 commit b7bc21b
Showing 1 changed file with 58 additions and 57 deletions.
Expand Up @@ -31,10 +31,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,9 +84,12 @@
public final class UnderFileSystemFactoryRegistry {
private static final Logger LOG = LoggerFactory.getLogger(UnderFileSystemFactoryRegistry.class);

// Key: absolute path to jar file
private static final Set<String> LOADED_LIB_JARS = new HashSet<>();
private static final Set<String> LOADED_EXTENSION_JARS = new HashSet<>();
/**
* The base list of factories, which does not include any lib or extension factories. The only
* factories in the base list will be the LocalUFS factory, and any additional factories
* registered by tests. All other UFS factories will be discovered and service loaded when ufs
* creation occurs.
*/
private static final List<UnderFileSystemFactory> FACTORIES = new CopyOnWriteArrayList<>();

private static boolean sInit = false;
Expand All @@ -102,9 +103,9 @@ private UnderFileSystemFactoryRegistry() {}
}

/**
* Returns a read-only view of the available factories.
* Returns a read-only view of the available base factories.
*
* @return Read-only view of the available factories
* @return Read-only view of the available base factories
*/
public static List<UnderFileSystemFactory> available() {
return Collections.unmodifiableList(FACTORIES);
Expand All @@ -131,22 +132,15 @@ public static UnderFileSystemFactory find(String path) {
@Nullable
public static UnderFileSystemFactory find(
String path, @Nullable UnderFileSystemConfiguration ufsConf) {
Preconditions.checkArgument(path != null, "path may not be null");

scanLibs();
scanExtensions();

for (UnderFileSystemFactory factory : FACTORIES) {
if (factory.supportsPath(path, ufsConf)) {
LOG.debug("Selected Under File System Factory implementation {} for path {}",
factory.getClass(), path);
return factory;
}
List<UnderFileSystemFactory> factories = findAll(path, ufsConf);
if (factories.isEmpty()) {
LOG.warn("No Under File System Factory implementation supports the path {}. Please check if "
+ "the under storage path is valid.", path);
return null;
}

LOG.warn("No Under File System Factory implementation supports the path {}. Please check if "
+ "the under storage path is valid.", path);
return null;
LOG.debug("Selected Under File System Factory implementation {} for path {}",
factories.get(0).getClass(), path);
return factories.get(0);
}

/**
Expand All @@ -156,15 +150,16 @@ public static UnderFileSystemFactory find(
* @param ufsConf configuration of the UFS
* @return list of factories that support the given path which may be an empty list
*/
public static List<UnderFileSystemFactory> findAll(
String path, UnderFileSystemConfiguration ufsConf) {
public static List<UnderFileSystemFactory> findAll(String path,
UnderFileSystemConfiguration ufsConf) {
Preconditions.checkArgument(path != null, "path may not be null");

scanLibs();
scanExtensions();
List<UnderFileSystemFactory> factories = new ArrayList<>(FACTORIES);
scanLibs(factories);
scanExtensions(factories);

List<UnderFileSystemFactory> eligibleFactories = new ArrayList<>();
for (UnderFileSystemFactory factory : FACTORIES) {
for (UnderFileSystemFactory factory : factories) {
if (factory.supportsPath(path, ufsConf)) {
LOG.debug("Under File System Factory implementation {} is eligible for path {}",
factory.getClass(), path);
Expand All @@ -179,17 +174,21 @@ public static List<UnderFileSystemFactory> findAll(
}

/**
* Finds all {@link UnderFileSystemFactory} from the extensions directory and caches.
* Finds all {@link UnderFileSystemFactory} from the extensions directory.
*
* @param factories list of factories to add to
*/
private static void scanExtensions() {
private static void scanExtensions(List<UnderFileSystemFactory> factories) {
LOG.info("Loading extension UFS jars from {}", Configuration.get(PropertyKey.EXTENSIONS_DIR));
scan(Arrays.asList(ExtensionUtils.listExtensions()), LOADED_EXTENSION_JARS);
scan(Arrays.asList(ExtensionUtils.listExtensions()), factories);
}

/**
* Finds all {@link UnderFileSystemFactory} from the lib directory and caches.
* Finds all {@link UnderFileSystemFactory} from the lib directory.
*
* @param factories list of factories to add to
*/
private static void scanLibs() {
private static void scanLibs(List<UnderFileSystemFactory> factories) {
LOG.info("Loading core UFS jars from {}", RuntimeConstants.LIB_DIR);
List<File> files = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(
Expand All @@ -202,32 +201,29 @@ private static void scanLibs() {
} catch (IOException e) {
LOG.warn("Failed to load UFS libs: {}", e.getMessage());
}
scan(files, LOADED_LIB_JARS);
scan(files, factories);
}

/**
* Class-loads jar files that have not been loaded.
*
* @param files jar files to class-load
* @param loadedJars jars already loaded under this dir
* @param factories list of factories to add to
*/
private static void scan(List<File> files, Set<String> loadedJars) {
private static void scan(List<File> files, List<UnderFileSystemFactory> factories) {
for (File jar : files) {
try {
URL extensionURL = jar.toURI().toURL();
String jarPath = extensionURL.toString();
if (!loadedJars.contains(jarPath)) {
ClassLoader extensionsClassLoader = new ExtensionsClassLoader(new URL[] {extensionURL},
ClassLoader.getSystemClassLoader());
ServiceLoader<UnderFileSystemFactory> extensionServiceLoader =
ServiceLoader.load(UnderFileSystemFactory.class, extensionsClassLoader);
for (UnderFileSystemFactory factory : extensionServiceLoader) {
LOG.debug("Discovered an Under File System Factory implementation {} - {} in jar {}",
factory.getClass(), factory.toString(), jarPath);
// Cache
register(factory);
loadedJars.add(jarPath);
}

ClassLoader extensionsClassLoader = new ExtensionsClassLoader(new URL[] {extensionURL},
ClassLoader.getSystemClassLoader());
ServiceLoader<UnderFileSystemFactory> extensionServiceLoader =
ServiceLoader.load(UnderFileSystemFactory.class, extensionsClassLoader);
for (UnderFileSystemFactory factory : extensionServiceLoader) {
LOG.debug("Discovered an Under File System Factory implementation {} - {} in jar {}",
factory.getClass(), factory.toString(), jarPath);
register(factory, factories);
}
} catch (Throwable t) {
LOG.warn("Failed to load jar {}: {}", jar, t.getMessage());
Expand All @@ -240,19 +236,16 @@ private static synchronized void init() {
return;
}

// Discover and register the available factories
// Discover and register the available base factories
ServiceLoader<UnderFileSystemFactory> discoveredFactories =
ServiceLoader.load(UnderFileSystemFactory.class,
UnderFileSystemFactory.class.getClassLoader());
for (UnderFileSystemFactory factory : discoveredFactories) {
LOG.debug("Discovered Under File System Factory implementation {} - {}", factory.getClass(),
factory.toString());
LOG.debug("Discovered base Under File System Factory implementation {} - {}",
factory.getClass(), factory.toString());
register(factory);
}

scanLibs();
scanExtensions();

sInit = true;
}

Expand All @@ -269,6 +262,11 @@ private static synchronized void init() {
* @param factory factory to register
*/
public static void register(UnderFileSystemFactory factory) {
register(factory, FACTORIES);
}

private static void register(UnderFileSystemFactory factory,
List<UnderFileSystemFactory> factories) {
if (factory == null) {
return;
}
Expand All @@ -278,7 +276,7 @@ public static void register(UnderFileSystemFactory factory) {

// Insert at start of list so it will take precedence over automatically discovered and
// previously registered factories
FACTORIES.add(0, factory);
factories.add(0, factory);
}

/**
Expand All @@ -292,8 +290,6 @@ public static synchronized void reset() {
// Reset state
sInit = false;
FACTORIES.clear();
LOADED_LIB_JARS.clear();
LOADED_EXTENSION_JARS.clear();
}

// Reinitialise
Expand All @@ -306,12 +302,17 @@ public static synchronized void reset() {
* @param factory factory to unregister
*/
public static void unregister(UnderFileSystemFactory factory) {
unregister(factory, FACTORIES);
}

private static void unregister(UnderFileSystemFactory factory,
List<UnderFileSystemFactory> factories) {
if (factory == null) {
return;
}

LOG.debug("Unregistered Under File System Factory implementation {} - {}", factory.getClass(),
factory.toString());
FACTORIES.remove(factory);
factories.remove(factory);
}
}

0 comments on commit b7bc21b

Please sign in to comment.