diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java index 7a0a64b2c85..1c407e135c6 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java @@ -46,7 +46,7 @@ public HBaseSchemaFactory(HBaseStoragePlugin plugin, String name) throws IOExcep } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { HBaseSchema schema = new HBaseSchema(schemaName); SchemaPlus hPlus = parent.add(schemaName, schema); schema.setHolder(hPlus); diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java index c10b0abd0d7..948d4623aaa 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java @@ -66,7 +66,7 @@ public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { schemaFactory.registerSchemas(session, parent); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index f4baf3b023f..91e7a92b5c9 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -82,7 +82,7 @@ public HiveScan getPhysicalScan(JSONOptions selection, List columns) } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { schemaFactory.registerSchemas(session, parent); } public Set getOptimizerRules() { diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java index 0e16e6f533a..587e90d3cb9 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.hive.schema; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -186,7 +187,7 @@ public HiveReadEntry load(String key) throws Exception { } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { HiveSchema schema = new HiveSchema(schemaName); SchemaPlus hPlus = parent.add(schemaName, schema); schema.setHolder(hPlus); diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java index e46d8ec5897..dfad5ef46fd 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoStoragePlugin.java @@ -63,7 +63,7 @@ public MongoStoragePluginConfig getConfig() { } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { schemaFactory.registerSchemas(session, parent); } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java index 3c706386f98..f650cccdcdb 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/schema/MongoSchemaFactory.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.mongo.schema; +import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; @@ -36,7 +37,6 @@ import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.SchemaFactory; -import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory; import org.apache.drill.exec.store.mongo.MongoCnxnManager; import org.apache.drill.exec.store.mongo.MongoScanSpec; import org.apache.drill.exec.store.mongo.MongoStoragePlugin; @@ -120,7 +120,7 @@ public List load(String dbName) throws Exception { } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { MongoSchema schema = new MongoSchema(schemaName); SchemaPlus hPlus = parent.add(schemaName, schema); schema.setHolder(hPlus); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 9400355034f..7dfd0e66e31 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -24,7 +24,6 @@ import java.util.Map; import net.hydromatic.optiq.SchemaPlus; -import net.hydromatic.optiq.jdbc.SimpleOptiqSchema; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; @@ -64,7 +63,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { private final Map tunnels = Maps.newHashMap(); private final DrillbitContext context; - private final UserClientConnection connection; // is null if attached to non-root fragment + private final UserClientConnection connection; // is null if this context is for non-root fragment + private final QueryContext queryContext; // is null if this context is for non-root fragment private final FragmentStats stats; private final FunctionImplementationRegistry funcRegistry; private final BufferAllocator allocator; @@ -87,10 +87,34 @@ public void accept(final RpcException e) { private final RpcOutcomeListener statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); private final AccountingUserConnection accountingUserConnection; + /** + * Create a FragmentContext instance for non-root fragment. + * + * @param dbContext DrillbitContext. + * @param fragment Fragment implementation. + * @param funcRegistry FunctionImplementationRegistry. + * @throws ExecutionSetupException + */ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, + final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { + this(dbContext, fragment, null, null, funcRegistry); + } + + /** + * Create a FragmentContext instance for root fragment. + * + * @param dbContext DrillbitContext. + * @param fragment Fragment implementation. + * @param queryContext QueryContext. + * @param connection UserClientConnection. + * @param funcRegistry FunctionImplementationRegistry. + * @throws ExecutionSetupException + */ + public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext, final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { this.context = dbContext; + this.queryContext = queryContext; this.connection = connection; this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler); this.fragment = fragment; @@ -128,6 +152,15 @@ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragm bufferManager = new BufferManager(this.allocator, this); } + /** + * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying + * the long list of test files. + */ + public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, + FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { + this(dbContext, fragment, null, connection, funcRegistry); + } + public OptionManager getOptions() { return fragmentOptions; } @@ -162,15 +195,13 @@ public DrillbitContext getDrillbitContext() { } public SchemaPlus getRootSchema() { - if (connection == null) { + if (queryContext == null) { fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " + "This is a non-root fragment.")); return null; } - final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false); - context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root); - return root; + return queryContext.getRootSchema(); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 2dcac25569f..cd5c0540719 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.ops; +import java.io.IOException; import java.util.Collection; import io.netty.buffer.DrillBuf; @@ -46,7 +47,7 @@ // TODO - consider re-name to PlanningContext, as the query execution context actually appears // in fragment contexts public class QueryContext implements AutoCloseable, UdfUtilities { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class); private final DrillbitContext drillbitContext; private final UserSession session; @@ -113,9 +114,15 @@ public SchemaPlus getNewDefaultSchema() { } public SchemaPlus getRootSchema() { - final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false); - drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema); - return rootSchema; + try { + final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false); + drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema); + return rootSchema; + } catch(IOException e) { + final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage()); + logger.error(errMsg, e); + throw new DrillRuntimeException(errMsg, e); + } } public OptionManager getOptions() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java index feadabdd643..14d2fab5280 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java @@ -21,8 +21,10 @@ import org.apache.drill.exec.rpc.user.UserSession; +import java.io.IOException; + public interface SchemaFactory { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class); - public void registerSchemas(UserSession session, SchemaPlus parent); + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index 5d0eed6a844..cb9ee0f1e8b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -301,7 +301,7 @@ public DrillSchemaFactory getSchemaFactory() { public class DrillSchemaFactory implements SchemaFactory { @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { Stopwatch watch = new Stopwatch(); watch.start(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java index 2f487d6637e..30c45fa40d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.util.List; @@ -40,13 +41,13 @@ */ public class AvroFormatPlugin extends EasyFormatPlugin { - public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, + public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storagePluginConfig) { - this(name, context, fs, storagePluginConfig, new AvroFormatConfig()); + this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig()); } - public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) { - super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro"); + public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) { + super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro"); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java index 9756f3cbdc9..3768aea73d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.compress.CompressionCodec; @@ -36,34 +37,32 @@ public class BasicFormatMatcher extends FormatMatcher{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class); - private final List patterns; - private final MagicStringMatcher matcher; - protected final DrillFileSystem fs; protected final FormatPlugin plugin; protected final boolean compressible; protected final CompressionCodecFactory codecFactory; - public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List patterns, List magicStrings) { + private final List patterns; + private final MagicStringMatcher matcher; + + public BasicFormatMatcher(FormatPlugin plugin, List patterns, List magicStrings) { super(); this.patterns = ImmutableList.copyOf(patterns); this.matcher = new MagicStringMatcher(magicStrings); - this.fs = fs; this.plugin = plugin; this.compressible = false; this.codecFactory = null; } - public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List extensions, boolean compressible) { + public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List extensions, boolean compressible) { List patterns = Lists.newArrayList(); for (String extension : extensions) { patterns.add(Pattern.compile(".*\\." + extension)); } this.patterns = patterns; this.matcher = new MagicStringMatcher(new ArrayList()); - this.fs = fs; this.plugin = plugin; this.compressible = compressible; - this.codecFactory = new CompressionCodecFactory(fs.getConf()); + this.codecFactory = new CompressionCodecFactory(fsConf); } @Override @@ -72,8 +71,8 @@ public boolean supportDirectoryReads() { } @Override - public FormatSelection isReadable(FileSelection selection) throws IOException { - if (isReadable(selection.getFirstPath(fs))) { + public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException { + if (isReadable(fs, selection.getFirstPath(fs))) { if (plugin.getName() != null) { NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig(); namedConfig.name = plugin.getName(); @@ -85,7 +84,7 @@ public FormatSelection isReadable(FileSelection selection) throws IOException { return null; } - protected final boolean isReadable(FileStatus status) throws IOException { + protected final boolean isReadable(DrillFileSystem fs, FileStatus status) throws IOException { CompressionCodec codec = null; if (compressible) { codec = codecFactory.getCodec(status.getPath()); @@ -103,7 +102,7 @@ protected final boolean isReadable(FileStatus status) throws IOException { } } - if (matcher.matches(status)) { + if (matcher.matches(fs, status)) { return true; } return false; @@ -128,7 +127,7 @@ public MagicStringMatcher(List magicStrings) { } } - public boolean matches(FileStatus status) throws IOException{ + public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{ if (ranges.isEmpty()) { return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java index 2683cca436a..f8afe3f393a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java @@ -96,11 +96,11 @@ public void run() { private final OperatorStats operatorStats; public DrillFileSystem(Configuration fsConf) throws IOException { - this(FileSystem.get(fsConf), null); + this(fsConf, null); } - public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) { - this.underlyingFs = fs; + public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException { + this.underlyingFs = FileSystem.get(fsConf); this.operatorStats = operatorStats; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index c5ca41b1607..775b402eb17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -44,6 +44,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import static org.apache.drill.exec.store.dfs.FileSystemSchemaFactory.DEFAULT_WS_NAME; + /** * A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem, * LocalFileSystem, as well Apache Drill specific CachedFileSystem, ClassPathFileSystem and LocalSyncableFileSystem. @@ -51,26 +53,26 @@ * references to the FileSystem configuration and path management. */ public class FileSystemPlugin extends AbstractStoragePlugin{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class); private final FileSystemSchemaFactory schemaFactory; - private Map formatPluginsByName; - private Map formatPluginsByConfig; - private FileSystemConfig config; - private DrillbitContext context; - private final DrillFileSystem fs; + private final Map formatPluginsByName; + private final Map formatPluginsByConfig; + private final FileSystemConfig config; + private final DrillbitContext context; + private final Configuration fsConf; public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{ try { this.config = config; this.context = context; - Configuration fsConf = new Configuration(); + fsConf = new Configuration(); fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection); fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName()); fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName()); - fs = new DrillFileSystem(fsConf); - formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config); + + formatPluginsByName = FormatCreator.getFormatPlugins(context, fsConf, config); List matchers = Lists.newArrayList(); formatPluginsByConfig = Maps.newHashMap(); for (FormatPlugin p : formatPluginsByName.values()) { @@ -78,17 +80,17 @@ public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String formatPluginsByConfig.put(p.getConfig(), p); } - boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty(); + final boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty(); List factories = Lists.newArrayList(); if (!noWorkspace) { for (Map.Entry space : config.workspaces.entrySet()) { - factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, fs, space.getValue(), matchers)); + factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, space.getKey(), name, space.getValue(), matchers)); } } // if the "default" workspace is not given add one. - if (noWorkspace || !config.workspaces.containsKey("default")) { - factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers)); + if (noWorkspace || !config.workspaces.containsKey(DEFAULT_WS_NAME)) { + factories.add(new WorkspaceSchemaFactory(context.getConfig(), this, DEFAULT_WS_NAME, name, WorkspaceConfig.DEFAULT, matchers)); } this.schemaFactory = new FileSystemSchemaFactory(name, factories); @@ -123,7 +125,7 @@ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { schemaFactory.registerSchemas(session, parent); } @@ -151,5 +153,7 @@ public Set getOptimizerRules() { return setBuilder.build(); } - + public Configuration getFsConf() { + return fsConf; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java index 44132d035e2..e11712e8519 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java @@ -44,12 +44,12 @@ * This is the top level schema that responds to root level path requests. Also supports */ public class FileSystemSchemaFactory implements SchemaFactory{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class); + + public static final String DEFAULT_WS_NAME = "default"; private List factories; private String schemaName; - private final String defaultSchemaName = "default"; - public FileSystemSchemaFactory(String schemaName, List factories) { super(); @@ -58,7 +58,7 @@ public FileSystemSchemaFactory(String schemaName, List f } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { FileSystemSchema schema = new FileSystemSchema(schemaName, session); SchemaPlus plusOfThis = parent.add(schema.getName(), schema); schema.setPlus(plusOfThis); @@ -69,14 +69,14 @@ public class FileSystemSchema extends AbstractSchema { private final WorkspaceSchema defaultSchema; private final Map schemaMap = Maps.newHashMap(); - public FileSystemSchema(String name, UserSession session) { + public FileSystemSchema(String name, UserSession session) throws IOException { super(ImmutableList.of(), name); for(WorkspaceSchemaFactory f : factories){ WorkspaceSchema s = f.createSchema(getSchemaPath(), session); schemaMap.put(s.getName(), s); } - defaultSchema = schemaMap.get(defaultSchemaName); + defaultSchema = schemaMap.get(DEFAULT_WS_NAME); } void setPlus(SchemaPlus plusOfThis){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java index c164ed5f0e6..d2a903b4913 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java @@ -31,20 +31,23 @@ import org.apache.drill.exec.server.DrillbitContext; import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; public class FormatCreator { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class); - static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class); - static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class); + private static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, + Configuration.class, StoragePluginConfig.class, FormatPluginConfig.class); + private static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, + Configuration.class, StoragePluginConfig.class); - static Map getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig) { + static Map getFormatPlugins(DrillbitContext context, Configuration fsConf, + FileSystemConfig storageConfig) { final DrillConfig config = context.getConfig(); Map plugins = Maps.newHashMap(); Collection> pluginClasses = PathScanner.scanForImplementations(FormatPlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES)); - if (storageConfig.formats == null || storageConfig.formats.isEmpty()) { for (Class pluginClass: pluginClasses) { @@ -53,7 +56,7 @@ static Map getFormatPlugins(DrillbitContext context, Drill if (!DEFAULT_BASED.check(c)) { continue; } - FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fileSystem, storageConfig); + FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fsConf, storageConfig); plugins.put(plugin.getName(), plugin); } catch (Exception e) { logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e); @@ -84,7 +87,7 @@ static Map getFormatPlugins(DrillbitContext context, Drill continue; } try { - plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue())); + plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fsConf, storageConfig, e.getValue())); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) { logger.warn("Failure initializing storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName(), e1); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java index 92e3d0a9909..0b8c7a8e36d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java @@ -23,6 +23,6 @@ public abstract class FormatMatcher { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class); public abstract boolean supportDirectoryReads(); - public abstract FormatSelection isReadable(FileSelection selection) throws IOException; + public abstract FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException; public abstract FormatPlugin getFormatPlugin(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java index 58d5b426937..955dfeb817d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.hadoop.conf.Configuration; /** * Similar to a storage engine but built specifically to work within a FileSystem context. @@ -51,7 +52,7 @@ public interface FormatPlugin { public FormatPluginConfig getConfig(); public StoragePluginConfig getStorageConfig(); - public DrillFileSystem getFileSystem(); + public Configuration getFsConf(); public DrillbitContext getContext(); public String getName(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index 45e91294b5d..a5363508e77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -44,6 +44,7 @@ import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.PartitionNotFoundException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -52,14 +53,14 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; -public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory { +public class WorkspaceSchemaFactory { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class); private final List fileMatchers; private final List dirMatchers; private final WorkspaceConfig config; - private final DrillFileSystem fs; + private final Configuration fsConf; private final DrillConfig drillConfig; private final String storageEngineName; private final String schemaName; @@ -67,9 +68,9 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa private final ObjectMapper mapper; public WorkspaceSchemaFactory(DrillConfig drillConfig, FileSystemPlugin plugin, String schemaName, - String storageEngineName, DrillFileSystem fileSystem, WorkspaceConfig config, - List formatMatchers) throws ExecutionSetupException, IOException { - this.fs = fileSystem; + String storageEngineName, WorkspaceConfig config, List formatMatchers) + throws ExecutionSetupException, IOException { + this.fsConf = plugin.getFsConf(); this.plugin = plugin; this.drillConfig = drillConfig; this.config = config; @@ -95,7 +96,7 @@ public WorkspaceSchemaFactory(DrillConfig drillConfig, FileSystemPlugin plugin, defaultInputFormat, storageEngineName, schemaName); throw new ExecutionSetupException(message); } - final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, fs, + final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, ImmutableList.of(Pattern.compile(".*")), ImmutableList.of()); fileMatchers.add(fallbackMatcher); } @@ -105,54 +106,21 @@ private Path getViewPath(String name) { return DotDrillType.VIEW.getPath(config.getLocation(), name); } - public WorkspaceSchema createSchema(List parentSchemaPath, UserSession session) { + public WorkspaceSchema createSchema(List parentSchemaPath, UserSession session) throws IOException { return new WorkspaceSchema(parentSchemaPath, schemaName, session); } - @Override - public DrillTable create(String key) { - try { + public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory { + private final ExpandingConcurrentMap tables = new ExpandingConcurrentMap<>(this); + private final UserSession session; + private final DrillFileSystem fs; - FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key); - if (fileSelection == null) { - return null; - } - - if (fileSelection.containsDirectories(fs)) { - for (FormatMatcher m : dirMatchers) { - try { - Object selection = m.isReadable(fileSelection); - if (selection != null) { - return new DynamicDrillTable(plugin, storageEngineName, selection); - } - } catch (IOException e) { - logger.debug("File read failed.", e); - } - } - fileSelection = fileSelection.minusDirectories(fs); - } - - for (FormatMatcher m : fileMatchers) { - Object selection = m.isReadable(fileSelection); - if (selection != null) { - return new DynamicDrillTable(plugin, storageEngineName, selection); - } - } - return null; - - } catch (IOException e) { - logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e); + public WorkspaceSchema(List parentSchemaPath, String wsName, UserSession session) throws IOException { + super(parentSchemaPath, wsName); + this.session = session; + this.fs = new DrillFileSystem(fsConf); } - return null; - } - - @Override - public void destroy(DrillTable value) { - } - - public class WorkspaceSchema extends AbstractSchema { - public boolean createView(View view) throws Exception { Path viewPath = getViewPath(view.getName()); boolean replaced = fs.exists(viewPath); @@ -186,15 +154,6 @@ public void dropView(String viewName) throws IOException { fs.delete(getViewPath(viewName), false); } - private ExpandingConcurrentMap tables = new ExpandingConcurrentMap<>(WorkspaceSchemaFactory.this); - - private UserSession session; - - public WorkspaceSchema(List parentSchemaPath, String name, UserSession session) { - super(parentSchemaPath, name); - this.session = session; - } - private Set getViews() { Set viewSet = Sets.newHashSet(); // Look for files with ".view.drill" extension. @@ -282,5 +241,47 @@ public CreateTableEntry createNewTable(String tableName) { public String getTypeName() { return FileSystemConfig.NAME; } + + @Override + public DrillTable create(String key) { + try { + + FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), key); + if (fileSelection == null) { + return null; + } + + if (fileSelection.containsDirectories(fs)) { + for (FormatMatcher m : dirMatchers) { + try { + Object selection = m.isReadable(fs, fileSelection); + if (selection != null) { + return new DynamicDrillTable(plugin, storageEngineName, selection); + } + } catch (IOException e) { + logger.debug("File read failed.", e); + } + } + fileSelection = fileSelection.minusDirectories(fs); + } + + for (FormatMatcher m : fileMatchers) { + Object selection = m.isReadable(fs, fileSelection); + if (selection != null) { + return new DynamicDrillTable(plugin, storageEngineName, selection); + } + } + return null; + + } catch (IOException e) { + logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e); + } + + return null; + } + + @Override + public void destroy(DrillTable value) { + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 6e1e0cc2510..5c7152a6e9b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -54,38 +54,39 @@ import com.google.common.collect.Lists; public abstract class EasyFormatPlugin implements FormatPlugin { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class); private final BasicFormatMatcher matcher; private final DrillbitContext context; private final boolean readable; private final boolean writable; private final boolean blockSplittable; - private final DrillFileSystem fs; + private final Configuration fsConf; private final StoragePluginConfig storageConfig; protected final FormatPluginConfig formatConfig; private final String name; protected final CompressionCodecFactory codecFactory; private final boolean compressible; - protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, - T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List extensions, String defaultName){ - this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible); + protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable, + boolean compressible, List extensions, String defaultName){ + this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible); this.readable = readable; this.writable = writable; this.context = context; this.blockSplittable = blockSplittable; this.compressible = compressible; - this.fs = fs; + this.fsConf = fsConf; this.storageConfig = storageConfig; this.formatConfig = formatConfig; this.name = name == null ? defaultName : name; - this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getConf())); + this.codecFactory = new CompressionCodecFactory(new Configuration(fsConf)); } @Override - public DrillFileSystem getFileSystem() { - return fs; + public Configuration getFsConf() { + return fsConf; } @Override @@ -152,7 +153,13 @@ RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws Exe int numParts = 0; OperatorContext oContext = new OperatorContext(scan, context, false /* ScanBatch is not subject to fragment memory limit */); - DrillFileSystem dfs = new DrillFileSystem(fs, oContext.getStats()); + DrillFileSystem dfs; + try { + dfs = new DrillFileSystem(fsConf, oContext.getStats()); + } catch (IOException e) { + throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); + } + for(FileWork work : scan.getWorkUnits()){ readers.add(getRecordReader(context, dfs, work, scan.getColumns())); if (scan.getSelectionRoot() != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 54cad561b7e..7c70df34f28 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; @@ -51,7 +52,7 @@ @JsonTypeName("fs-scan") public class EasyGroupScan extends AbstractFileGroupScan{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class); private FileSelection selection; private final EasyFormatPlugin formatPlugin; @@ -109,9 +110,10 @@ private EasyGroupScan(EasyGroupScan that) { } private void initFromSelection(FileSelection selection, EasyFormatPlugin formatPlugin) throws IOException { + final DrillFileSystem dfs = new DrillFileSystem(formatPlugin.getFsConf()); this.selection = selection; - BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem(), formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); + BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits()); + this.chunks = b.generateFileWork(selection.getFileStatusList(dfs), formatPlugin.isBlockSplittable()); this.maxWidth = chunks.size(); this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 2e654669cc9..32e34b47221 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; @@ -38,22 +39,26 @@ import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; public class JSONFormatPlugin extends EasyFormatPlugin { private static final boolean IS_COMPRESSIBLE = true; + private static final String DEFAULT_NAME = "json"; + private static final List DEFAULT_EXTS = ImmutableList.of("json"); - public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) { - this(name, context, fs, storageConfig, new JSONFormatConfig()); + public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { + this(name, context, fsConf, storageConfig, new JSONFormatConfig()); } - public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) { - super(name, context, fs, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, Lists.newArrayList("json"), "json"); + public JSONFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, + JSONFormatConfig formatPluginConfig) { + super(name, context, fsConf, config, formatPluginConfig, true, false, false, IS_COMPRESSIBLE, + DEFAULT_EXTS, DEFAULT_NAME); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index bf463951fd1..237589cf875 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -18,7 +18,7 @@ package org.apache.drill.exec.store.easy.text; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,6 +42,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.text.DrillTextRecordReader; import org.apache.drill.exec.store.text.DrillTextRecordWriter; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; @@ -51,13 +52,17 @@ import com.google.common.collect.Maps; public class TextFormatPlugin extends EasyFormatPlugin { + private final static String DEFAULT_NAME = "text"; - public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) { - super(name, context, fs, storageConfig, new TextFormatConfig(), true, false, true, true, new ArrayList(), "text"); + public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) { + super(name, context, fsConf, storageConfig, new TextFormatConfig(), true, false, true, true, + Collections.emptyList(), DEFAULT_NAME); } - public TextFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, TextFormatConfig formatPluginConfig) { - super(name, context, fs, config, formatPluginConfig, true, false, true, true, formatPluginConfig.getExtensions(), "text"); + public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, + TextFormatConfig formatPluginConfig) { + super(name, context, fsConf, config, formatPluginConfig, true, false, true, true, + formatPluginConfig.getExtensions(), DEFAULT_NAME); } @@ -67,7 +72,8 @@ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs Path path = dfs.makeQualified(new Path(fileWork.getPath())); FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported"); - return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns); + return new DrillTextRecordReader(split, getFsConf(), context, + ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java index 77c6b9ad0c5..a1249e6b056 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java @@ -69,7 +69,7 @@ public StoragePluginConfig getConfig() { } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { ISchema s = new ISchema(parent, this); parent.add(s.getName(), s); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java index 51b22080cc9..96226a1c37c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java @@ -55,7 +55,7 @@ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List } @Override - public void registerSchemas(UserSession session, SchemaPlus parent) { + public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException { } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index e204a2c28a3..9c83ea0fc38 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -62,37 +62,44 @@ import com.google.common.collect.Maps; public class ParquetFormatPlugin implements FormatPlugin{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class); - private final DrillbitContext context; public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - private CodecFactoryExposer codecFactoryExposer; - private final DrillFileSystem fs; + + private static final String DEFAULT_NAME = "parquet"; + + private static final List PATTERNS = Lists.newArrayList( + Pattern.compile(".*\\.parquet$"), + Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE)); + private static final List MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC)); + + private final DrillbitContext context; + private final CodecFactoryExposer codecFactoryExposer; + private final Configuration fsConf; private final ParquetFormatMatcher formatMatcher; private final ParquetFormatConfig config; private final StoragePluginConfig storageConfig; private final String name; - public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){ - this(name, context, fs, storageConfig, new ParquetFormatConfig()); + public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig){ + this(name, context, fsConf, storageConfig, new ParquetFormatConfig()); } - public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){ + public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){ this.context = context; - this.codecFactoryExposer = new CodecFactoryExposer(fs.getConf()); + this.codecFactoryExposer = new CodecFactoryExposer(fsConf); this.config = formatConfig; - this.formatMatcher = new ParquetFormatMatcher(this, fs); + this.formatMatcher = new ParquetFormatMatcher(this); this.storageConfig = storageConfig; - this.fs = fs; - this.name = name == null ? "parquet" : name; - } - - Configuration getHadoopConfig() { - return fs.getConf(); + this.fsConf = fsConf; + this.name = name == null ? DEFAULT_NAME : name; } - public DrillFileSystem getFileSystem() { - return fs; + @Override + public Configuration getFsConf() { + return fsConf; } @Override @@ -155,12 +162,13 @@ public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, @Override public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException { - return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null); + return getGroupScan(selection, null); } @Override public ParquetGroupScan getGroupScan(FileSelection selection, List columns) throws IOException { - return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns); + final DrillFileSystem dfs = new DrillFileSystem(fsConf); + return new ParquetGroupScan(selection.getFileStatusList(dfs), this, selection.selectionRoot, columns); } @Override @@ -190,20 +198,8 @@ public FormatMatcher getMatcher() { private static class ParquetFormatMatcher extends BasicFormatMatcher{ - private final DrillFileSystem fs; - - public ParquetFormatMatcher(ParquetFormatPlugin plugin, DrillFileSystem fs) { - super(plugin, fs, // - Lists.newArrayList( // - Pattern.compile(".*\\.parquet$"), // - Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE) // - // - ), - Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC)) - - ); - this.fs = fs; - + public ParquetFormatMatcher(ParquetFormatPlugin plugin) { + super(plugin, PATTERNS, MAGIC_STRINGS); } @Override @@ -212,17 +208,17 @@ public boolean supportDirectoryReads() { } @Override - public FormatSelection isReadable(FileSelection selection) throws IOException { + public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException { // TODO: we only check the first file for directory reading. This is because if(selection.containsDirectories(fs)){ - if(isDirReadable(selection.getFirstPath(fs))){ + if(isDirReadable(fs, selection.getFirstPath(fs))){ return new FormatSelection(plugin.getConfig(), selection); } } - return super.isReadable(selection); + return super.isReadable(fs, selection); } - boolean isDirReadable(FileStatus dir) { + boolean isDirReadable(DrillFileSystem fs, FileStatus dir) { Path p = new Path(dir.getPath(), ParquetFileWriter.PARQUET_METADATA_FILE); try { if (fs.exists(p)) { @@ -235,16 +231,12 @@ boolean isDirReadable(FileStatus dir) { if (files.length == 0) { return false; } - return super.isReadable(files[0]); + return super.isReadable(fs, files[0]); } } catch (IOException e) { logger.info("Failure while attempting to check for Parquet metadata file.", e); return false; } } - - - } - } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index acac61f3544..a59f2c98239 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.TimedRunnable; +import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.ReadEntryWithPath; @@ -76,11 +77,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); static final MetricRegistry metrics = DrillMetrics.getInstance(); static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter"); - static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes"); - static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments"); - static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity"); - - final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST); private ListMultimap mappings; private List rowGroupInfos; @@ -135,7 +131,7 @@ public ParquetGroupScan( // Preconditions.checkNotNull(formatConfig); this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig); Preconditions.checkNotNull(formatPlugin); - this.fs = formatPlugin.getFileSystem(); + this.fs = new DrillFileSystem(formatPlugin.getFsConf()); this.formatConfig = formatPlugin.getConfig(); this.entries = entries; this.selectionRoot = selectionRoot; @@ -155,7 +151,7 @@ public ParquetGroupScan(List files, // this.formatPlugin = formatPlugin; this.columns = columns; this.formatConfig = formatPlugin.getConfig(); - this.fs = formatPlugin.getFileSystem(); + this.fs = new DrillFileSystem(formatPlugin.getFsConf()); this.entries = Lists.newArrayList(); for (FileStatus file : files) { @@ -205,7 +201,7 @@ private void readFooter(List statuses) throws IOException { ColumnChunkMetaData columnChunkMetaData; - List