Skip to content
Permalink
Browse files
DRILL-6614: Allow usage of MapRDBFormatPlugin for HiveStoragePlugin
  • Loading branch information
vdiravka committed Jul 19, 2018
1 parent 92fbed8 commit 85344abd1ddb73448bdf67cdc6883cb98795a910
Showing 7 changed files with 51 additions and 23 deletions.
@@ -32,11 +32,11 @@
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.dfs.FileSystemConfig;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -249,8 +249,8 @@ public String getDigest() {
}

@JsonProperty("storage")
public FileSystemConfig getStorageConfig() {
return (FileSystemConfig) storagePlugin.getConfig();
public StoragePluginConfig getStorageConfig() {
return storagePlugin.getConfig();
}

@JsonIgnore
@@ -124,21 +124,15 @@ private DrillScanRel createNativeScanRel(final DrillScanRel hiveScanRel) throws
Map<String, String> parameters = hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters();

JsonScanSpec scanSpec = new JsonScanSpec(parameters.get(MAPRDB_TABLE_NAME), null);
MapRDBFormatPlugin mapRDBFormatPlugin = new MapRDBFormatPlugin(
"hive-maprdb",
hiveScan.getStoragePlugin().getContext(),
hiveScan.getHiveConf(),
hiveScan.getStoragePlugin().getConfig(),
new MapRDBFormatPluginConfig()
);
List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
.map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath))
.collect(Collectors.toList());
JsonTableGroupScan nativeMapRDBScan =
new JsonTableGroupScan(
hiveScan.getUserName(),
hiveScan.getStoragePlugin(),
mapRDBFormatPlugin,
// TODO: We should use Hive format plugins here, once it will be implemented. DRILL-6621
(MapRDBFormatPlugin) hiveScan.getStoragePlugin().getFormatPlugin(new MapRDBFormatPluginConfig()),
scanSpec,
hiveScanCols
);
@@ -37,6 +37,7 @@
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -49,10 +50,13 @@
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -62,6 +66,8 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);

public static final String HIVE_MAPRDB_FORMAT_PLUGIN_NAME = "hive-maprdb";

private final HiveStoragePluginConfig config;
private HiveSchemaFactory schemaFactory;
private final HiveConf hiveConf;
@@ -216,4 +222,19 @@ public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesC
return ruleBuilder.build();
}

@Override
public FormatPlugin getFormatPlugin(FormatPluginConfig formatConfig) {
// TODO: implement formatCreator similar to FileSystemPlugin formatCreator. DRILL-6621
if (formatConfig instanceof MapRDBFormatPluginConfig) {
try {
return new MapRDBFormatPlugin(HIVE_MAPRDB_FORMAT_PLUGIN_NAME, context, hiveConf, config,
(MapRDBFormatPluginConfig) formatConfig);
} catch (IOException e) {
throw new DrillRuntimeException("The error is occurred while connecting to MapR-DB", e);
}
}
throw new DrillRuntimeException(String.format("Hive storage plugin doesn't support usage of %s format plugin",
formatConfig.getClass().getName()));
}

}
@@ -24,20 +24,22 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.PlannerPhase;

import com.google.common.collect.ImmutableSet;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.dfs.FormatPlugin;

/** Abstract class for StorePlugin implementations.
* See StoragePlugin for description of the interface intent and its methods.
*/
public abstract class AbstractStoragePlugin implements StoragePlugin {

private final DrillbitContext context;
protected final DrillbitContext context;
private final String name;

protected AbstractStoragePlugin(DrillbitContext inContext, String inName) {
@@ -130,11 +132,17 @@ public void start() throws IOException { }
@Override
public void close() throws Exception { }

@Override
public FormatPlugin getFormatPlugin(FormatPluginConfig config) {
throw new UnsupportedOperationException(String.format("%s doesn't support format plugins", getClass().getName()));
}

public DrillbitContext getContext() {
return context;
}

public String getName() {
return name;
}

}
@@ -24,10 +24,12 @@
import org.apache.calcite.plan.RelOptRule;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.dfs.FormatPlugin;

/** Interface for all implementations of the storage plugins. Different implementations of the storage
* formats will implement methods that indicate if Drill can write or read its tables from that format,
@@ -101,4 +103,13 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable {
* Initialize the storage plugin. The storage plugin will not be used until this method is called.
*/
void start() throws IOException;

/**
* Allows to get the format plugin for current storage plugin based on appropriate format plugin config usage.
*
* @param config format plugin config
* @return format plugin instance
* @throws UnsupportedOperationException, if storage plugin doesn't support format plugins.
*/
FormatPlugin getFormatPlugin(FormatPluginConfig config);
}
@@ -49,7 +49,6 @@
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.ischema.InfoSchemaConfig;
import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin;
@@ -327,14 +326,8 @@ public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetup
@Override
public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig)
throws ExecutionSetupException {
StoragePlugin p = getPlugin(storageConfig);
if (!(p instanceof FileSystemPlugin)) {
throw new ExecutionSetupException(
String.format("You tried to request a format plugin for a storage plugin that wasn't of type "
+ "FileSystemPlugin. The actual type of plugin was %s.", p.getClass().getName()));
}
FileSystemPlugin storage = (FileSystemPlugin) p;
return storage.getFormatPlugin(formatConfig);
StoragePlugin storagePlugin = getPlugin(storageConfig);
return storagePlugin.getFormatPlugin(formatConfig);
}

private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
@@ -152,6 +152,7 @@ public FormatPlugin getFormatPlugin(String name) {
* @param config format plugin configuration
* @return format plugin for given configuration if found, null otherwise
*/
@Override
public FormatPlugin getFormatPlugin(FormatPluginConfig config) {
if (config instanceof NamedFormatPluginConfig) {
return formatCreator.getFormatPluginByName(((NamedFormatPluginConfig) config).name);
@@ -167,9 +168,9 @@ public FormatPlugin getFormatPlugin(FormatPluginConfig config) {
@Override
public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
Builder<StoragePluginOptimizerRule> setBuilder = ImmutableSet.builder();
for(FormatPlugin plugin : formatCreator.getConfiguredFormatPlugins()){
for (FormatPlugin plugin : formatCreator.getConfiguredFormatPlugins()) {
Set<StoragePluginOptimizerRule> rules = plugin.getOptimizerRules();
if(rules != null && rules.size() > 0){
if (rules != null && rules.size() > 0) {
setBuilder.addAll(rules);
}
}

0 comments on commit 85344ab

Please sign in to comment.