Skip to content

Commit

Permalink
DRILL-7030: Make format plugins fully pluggable
Browse files Browse the repository at this point in the history
- Bootstrap files for format plugins were introduced and added to the existing plugins in contrib.
- Formats from these files are being added dynamically to the corresponding storage plugins.

closes #1780
  • Loading branch information
agozhiy authored and Ben-Zvi committed May 8, 2019
1 parent ba9837e commit edbfd64
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 16 deletions.
@@ -0,0 +1,26 @@
{
"storage":{
"dfs": {
"type": "file",
"formats": {
"ltsv": {
"type": "ltsv",
"extensions": [
"ltsv"
]
}
}
},
"s3": {
"type": "file",
"formats": {
"ltsv": {
"type": "ltsv",
"extensions": [
"ltsv"
]
}
}
}
}
}
@@ -0,0 +1,20 @@
{
"storage":{
"dfs": {
"type": "file",
"formats": {
"maprdb": {
"type": "maprdb"
}
}
},
"s3": {
"type": "file",
"formats": {
"maprdb": {
"type": "maprdb"
}
}
}
}
}
@@ -0,0 +1,26 @@
{
"storage":{
"dfs": {
"type": "file",
"formats": {
"syslog": {
"type": "syslog",
"extensions": [
"syslog"
]
}
}
},
"s3": {
"type": "file",
"formats": {
"syslog": {
"type": "syslog",
"extensions": [
"syslog"
]
}
}
}
}
}
Expand Up @@ -726,6 +726,7 @@ private ExecConstants() {
new OptionDescription("Min width for text readers, mostly for testing."));

public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
public static final String BOOTSTRAP_FORMAT_PLUGINS_FILE = "bootstrap-format-plugins.json";

public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = "exec.storage.skip_runtime_rowgroup_pruning";
public static final OptionValidator SKIP_RUNTIME_ROWGROUP_PRUNING = new BooleanValidator(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY,
Expand Down
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.LogicalPlanPersistence;
Expand Down Expand Up @@ -300,7 +301,8 @@ private PersistentStore<StoragePluginConfig> initPluginsSystemTable(DrillbitCont
}

/**
* Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh
* Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE}
* and format plugins {@link ExecConstants#BOOTSTRAP_FORMAT_PLUGINS_FILE} files for the first fresh
* instantiating of Drill
*
* @param lpPersistence deserialization mapper provider
Expand All @@ -310,22 +312,19 @@ private PersistentStore<StoragePluginConfig> initPluginsSystemTable(DrillbitCont
private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence) throws IOException {
// bootstrap load the config since no plugins are stored.
logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration.");
Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
if (urls != null && !urls.isEmpty()) {
logger.info("Loading the storage plugin configs from URLs {}.", urls);
Set<URL> storageUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false);
Set<URL> formatUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_FORMAT_PLUGINS_FILE, false);
if (storageUrls != null && !storageUrls.isEmpty()) {
logger.info("Loading the storage plugin configs from URLs {}.", storageUrls);
StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>());
Map<String, URL> pluginURLMap = new HashMap<>();
for (URL url : urls) {
String pluginsData = Resources.toString(url, Charsets.UTF_8);
StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
for (Entry<String, StoragePluginConfig> plugin : plugins) {
StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
if (oldPluginConfig != null) {
logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.",
plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
} else {
pluginURLMap.put(plugin.getKey(), url);
}
for (URL url : storageUrls) {
loadStoragePlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
}
if (formatUrls != null && !formatUrls.isEmpty()) {
logger.info("Loading the format plugin configs from URLs {}.", formatUrls);
for (URL url : formatUrls) {
loadFormatPlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence);
}
}
return bootstrapPlugins;
Expand All @@ -334,6 +333,65 @@ private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence
}
}

/**
* Loads storage plugins from the given URL
*
* @param url URL to the storage plugins bootstrap file
* @param bootstrapPlugins a collection where the plugins should be loaded to
* @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
* @param lpPersistence need to get an object mapper for the bootstrap files
* @throws IOException if failed to retrieve a plugin from a bootstrap file
*/
private void loadStoragePlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
plugins.forEach(plugin -> {
StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue());
if (oldPluginConfig != null) {
logger.warn("Duplicate plugin instance '[{}]' defined in [{}, {}], ignoring the later one.",
plugin.getKey(), pluginURLMap.get(plugin.getKey()), url);
} else {
pluginURLMap.put(plugin.getKey(), url);
}
});
}

/**
* Loads format plugins from the given URL and adds the formats to the specified storage plugins
*
* @param url URL to the format plugins bootstrap file
* @param bootstrapPlugins a collection with loaded storage plugins. New formats will be added to them
* @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging
* @param lpPersistence need to get an object mapper for the bootstrap files
* @throws IOException if failed to retrieve a plugin from a bootstrap file
*/
private void loadFormatPlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException {
StoragePlugins plugins = getPluginsFromResource(url, lpPersistence);
plugins.forEach(formatPlugin -> {
String targetStoragePluginName = formatPlugin.getKey();
StoragePluginConfig storagePlugin = bootstrapPlugins.getConfig(targetStoragePluginName);
StoragePluginConfig formatPluginValue = formatPlugin.getValue();
if (storagePlugin == null) {
logger.warn("No storage plugins with the given name are registered: '[{}]'", targetStoragePluginName);
} else if (storagePlugin instanceof FileSystemConfig && formatPluginValue instanceof FileSystemConfig) {
FileSystemConfig targetPlugin = (FileSystemConfig) storagePlugin;
((FileSystemConfig) formatPluginValue).getFormats().forEach((formatName, formatValue) -> {
FormatPluginConfig oldPluginConfig = targetPlugin.getFormats().putIfAbsent(formatName, formatValue);
if (oldPluginConfig != null) {
logger.warn("Duplicate format instance '[{}]' defined in [{}, {}], ignoring the later one.",
formatName, pluginURLMap.get(targetStoragePluginName), url);
}
});
} else {
logger.warn("Formats are only supported by File System plugin type: '[{}]'", targetStoragePluginName);
}
});
}

private StoragePlugins getPluginsFromResource(URL resource, LogicalPlanPersistence lpPersistence) throws IOException {
String pluginsData = Resources.toString(resource, Charsets.UTF_8);
return lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class);
}

/**
* Dynamically loads system plugins annotated with {@link SystemPlugin}.
* Will skip plugin initialization if no matching constructor, incorrect class implementation, name absence are detected.
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.dfs;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -48,7 +49,7 @@ public FileSystemConfig(@JsonProperty("connection") String connection,
Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap();
Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll);
this.workspaces = caseInsensitiveWorkspaces;
this.formats = formats;
this.formats = formats != null ? formats : new LinkedHashMap<>();
}

@JsonProperty
Expand Down

0 comments on commit edbfd64

Please sign in to comment.