Skip to content

Commit

Permalink
[FLINK-11952][3/4] Integrate plugin mechanism with FileSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Apr 15, 2019
1 parent 0c4953c commit c7b141b
Showing 1 changed file with 76 additions and 33 deletions.
109 changes: 76 additions & 33 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystemFactory;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;
Expand All @@ -44,12 +45,14 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -217,11 +220,8 @@ public enum WriteMode {
/** Cache for file systems, by scheme + authority. */
private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>();

/** All available file system factories. */
private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems();

/** Mapping of file system schemes to the corresponding factories,
* populated in {@link FileSystem#initialize(Configuration)}. */
* populated in {@link FileSystem#initialize(Configuration, PluginManager)}. */
private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>();

/** The default factory that is used when no scheme matches. */
Expand Down Expand Up @@ -249,17 +249,57 @@ public enum WriteMode {
* A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
* {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
*
* @deprecated use {@link #initialize(Configuration, PluginManager)} instead.
*
* @param config the configuration from where to fetch the parameter.
*/
@Deprecated
public static void initialize(Configuration config) throws IllegalConfigurationException {
initializeWithoutPlugins(config);
}

private static void initializeWithoutPlugins(Configuration config) throws IllegalConfigurationException {
initialize(config, null);
}

/**
* Initializes the shared file system settings.
*
* <p>The given configuration is passed to each file system factory to initialize the respective
* file systems. Because the configuration of file systems may be different subsequent to the call
* of this method, this method clears the file system instance cache.
*
* <p>This method also reads the default file system URI from the configuration key
* {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where
* the URI has no scheme will be interpreted as relative to that URI.
* As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}.
* A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
* {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
*
* @param config the configuration from where to fetch the parameter.
* @param pluginManager optional plugin manager that is used to initialized filesystems provided as plugins.
*/
public static void initialize(
Configuration config,
PluginManager pluginManager) throws IllegalConfigurationException {

LOCK.lock();
try {
// make sure file systems are re-instantiated after re-configuration
CACHE.clear();
FS_FACTORIES.clear();

Collection<Supplier<Iterator<FileSystemFactory>>> factorySuppliers = new ArrayList<>(2);
factorySuppliers.add(() -> ServiceLoader.load(FileSystemFactory.class).iterator());

if (pluginManager != null) {
factorySuppliers.add(() -> pluginManager.load(FileSystemFactory.class));
}

final List<FileSystemFactory> fileSystemFactories = loadFileSystemFactories(factorySuppliers);

// configure all file system factories
for (FileSystemFactory factory : RAW_FACTORIES) {
for (FileSystemFactory factory : fileSystemFactories) {
factory.configure(config);
String scheme = factory.getScheme();

Expand Down Expand Up @@ -384,7 +424,7 @@ public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOExcept
// even when not configured with an explicit Flink configuration, like on
// JobManager or TaskManager setup
if (FS_FACTORIES.isEmpty()) {
initialize(new Configuration());
initializeWithoutPlugins(new Configuration());
}

// Try to create a new file system
Expand Down Expand Up @@ -944,46 +984,49 @@ public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean crea
*
* @return A map from the file system scheme to corresponding file system factory.
*/
private static List<FileSystemFactory> loadFileSystems() {
private static List<FileSystemFactory> loadFileSystemFactories(
Collection<Supplier<Iterator<FileSystemFactory>>> factoryIteratorsSuppliers) {

final ArrayList<FileSystemFactory> list = new ArrayList<>();

// by default, we always have the local file system factory
list.add(new LocalFileSystemFactory());

LOG.debug("Loading extension file systems via services");

try {
ServiceLoader<FileSystemFactory> serviceLoader = ServiceLoader.load(FileSystemFactory.class);
Iterator<FileSystemFactory> iter = serviceLoader.iterator();

// we explicitly use an iterator here (rather than for-each) because that way
// we can catch errors in individual service instantiations

//noinspection WhileLoopReplaceableByForEach
while (iter.hasNext()) {
try {
FileSystemFactory factory = iter.next();
list.add(factory);
LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName());
}
catch (Throwable t) {
// catching Throwable here to handle various forms of class loading
// and initialization errors
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
LOG.error("Failed to load a file system via services", t);
}
for (Supplier<Iterator<FileSystemFactory>> factoryIteratorsSupplier : factoryIteratorsSuppliers) {
try {
addAllFactoriesToList(factoryIteratorsSupplier.get(), list);
} catch (Throwable t) {
// catching Throwable here to handle various forms of class loading
// and initialization errors
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
LOG.error("Failed to load additional file systems via services", t);
}
}
catch (Throwable t) {
// catching Throwable here to handle various forms of class loading
// and initialization errors
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
LOG.error("Failed to load additional file systems via services", t);
}

return Collections.unmodifiableList(list);
}

private static void addAllFactoriesToList(Iterator<FileSystemFactory> iter, List<FileSystemFactory> list) {
// we explicitly use an iterator here (rather than for-each) because that way
// we can catch errors in individual service instantiations

while (iter.hasNext()) {
try {
FileSystemFactory factory = iter.next();
list.add(factory);
LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName());
}
catch (Throwable t) {
// catching Throwable here to handle various forms of class loading
// and initialization errors
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
LOG.error("Failed to load a file system via services", t);
}
}
}

/**
* Utility loader for the Hadoop file system factory.
* We treat the Hadoop FS factory in a special way, because we use it as a catch
Expand Down

0 comments on commit c7b141b

Please sign in to comment.