diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index e7a37654b8a4e..d159e70e885d6 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.fs.local.LocalFileSystemFactory; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -44,12 +45,14 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.ServiceLoader; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -217,11 +220,8 @@ public enum WriteMode { /** Cache for file systems, by scheme + authority. */ private static final HashMap CACHE = new HashMap<>(); - /** All available file system factories. */ - private static final List RAW_FACTORIES = loadFileSystems(); - /** Mapping of file system schemes to the corresponding factories, - * populated in {@link FileSystem#initialize(Configuration)}. */ + * populated in {@link FileSystem#initialize(Configuration, PluginManager)}. */ private static final HashMap FS_FACTORIES = new HashMap<>(); /** The default factory that is used when no scheme matches. */ @@ -249,17 +249,57 @@ public enum WriteMode { * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}. * + * @deprecated use {@link #initialize(Configuration, PluginManager)} instead. + * * @param config the configuration from where to fetch the parameter. */ + @Deprecated public static void initialize(Configuration config) throws IllegalConfigurationException { + initializeWithoutPlugins(config); + } + + private static void initializeWithoutPlugins(Configuration config) throws IllegalConfigurationException { + initialize(config, null); + } + + /** + * Initializes the shared file system settings. + * + *

The given configuration is passed to each file system factory to initialize the respective + * file systems. Because the configuration of file systems may be different subsequent to the call + * of this method, this method clears the file system instance cache. + * + *

This method also reads the default file system URI from the configuration key + * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where + * the URI has no scheme will be interpreted as relative to that URI. + * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}. + * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as + * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}. + * + * @param config the configuration from where to fetch the parameter. + * @param pluginManager optional plugin manager that is used to initialized filesystems provided as plugins. + */ + public static void initialize( + Configuration config, + PluginManager pluginManager) throws IllegalConfigurationException { + LOCK.lock(); try { // make sure file systems are re-instantiated after re-configuration CACHE.clear(); FS_FACTORIES.clear(); + Collection>> factorySuppliers = new ArrayList<>(2); + factorySuppliers.add(() -> ServiceLoader.load(FileSystemFactory.class).iterator()); + + if (pluginManager != null) { + factorySuppliers.add(() -> pluginManager.load(FileSystemFactory.class)); + } + + final List fileSystemFactories = loadFileSystemFactories(factorySuppliers); + // configure all file system factories - for (FileSystemFactory factory : RAW_FACTORIES) { + for (FileSystemFactory factory : fileSystemFactories) { factory.configure(config); String scheme = factory.getScheme(); @@ -384,7 +424,7 @@ public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOExcept // even when not configured with an explicit Flink configuration, like on // JobManager or TaskManager setup if (FS_FACTORIES.isEmpty()) { - initialize(new Configuration()); + initializeWithoutPlugins(new Configuration()); } // Try to create a new file system @@ -944,7 +984,9 @@ public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean crea * * @return A map from the file system scheme to corresponding file system factory. */ - private static List loadFileSystems() { + private static List loadFileSystemFactories( + Collection>> factoryIteratorsSuppliers) { + final ArrayList list = new ArrayList<>(); // by default, we always have the local file system factory @@ -952,38 +994,39 @@ private static List loadFileSystems() { LOG.debug("Loading extension file systems via services"); - try { - ServiceLoader serviceLoader = ServiceLoader.load(FileSystemFactory.class); - Iterator iter = serviceLoader.iterator(); - - // we explicitly use an iterator here (rather than for-each) because that way - // we can catch errors in individual service instantiations - - //noinspection WhileLoopReplaceableByForEach - while (iter.hasNext()) { - try { - FileSystemFactory factory = iter.next(); - list.add(factory); - LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName()); - } - catch (Throwable t) { - // catching Throwable here to handle various forms of class loading - // and initialization errors - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - LOG.error("Failed to load a file system via services", t); - } + for (Supplier> factoryIteratorsSupplier : factoryIteratorsSuppliers) { + try { + addAllFactoriesToList(factoryIteratorsSupplier.get(), list); + } catch (Throwable t) { + // catching Throwable here to handle various forms of class loading + // and initialization errors + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + LOG.error("Failed to load additional file systems via services", t); } } - catch (Throwable t) { - // catching Throwable here to handle various forms of class loading - // and initialization errors - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - LOG.error("Failed to load additional file systems via services", t); - } return Collections.unmodifiableList(list); } + private static void addAllFactoriesToList(Iterator iter, List list) { + // we explicitly use an iterator here (rather than for-each) because that way + // we can catch errors in individual service instantiations + + while (iter.hasNext()) { + try { + FileSystemFactory factory = iter.next(); + list.add(factory); + LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName()); + } + catch (Throwable t) { + // catching Throwable here to handle various forms of class loading + // and initialization errors + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + LOG.error("Failed to load a file system via services", t); + } + } + } + /** * Utility loader for the Hadoop file system factory. * We treat the Hadoop FS factory in a special way, because we use it as a catch