Skip to content

Commit

Permalink
DRILL-2413: FileSystemPlugin refactoring: avoid sharing DrillFileSyst…
Browse files Browse the repository at this point in the history
…em across schemas
  • Loading branch information
vkorukanti committed Apr 21, 2015
1 parent fbb405b commit 117b749
Show file tree
Hide file tree
Showing 34 changed files with 279 additions and 221 deletions.
Expand Up @@ -46,7 +46,7 @@ public HBaseSchemaFactory(HBaseStoragePlugin plugin, String name) throws IOExcep
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) {
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
HBaseSchema schema = new HBaseSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
Expand Down
Expand Up @@ -66,7 +66,7 @@ public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) {
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}

Expand Down
Expand Up @@ -82,7 +82,7 @@ public HiveScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns)
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) {
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.hive.schema;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -186,7 +187,7 @@ public HiveReadEntry load(String key) throws Exception {
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) {
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
HiveSchema schema = new HiveSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
Expand Down
Expand Up @@ -63,7 +63,7 @@ public MongoStoragePluginConfig getConfig() {
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) {
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
}

Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.mongo.schema;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -36,7 +37,6 @@
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import org.apache.drill.exec.store.mongo.MongoCnxnManager;
import org.apache.drill.exec.store.mongo.MongoScanSpec;
import org.apache.drill.exec.store.mongo.MongoStoragePlugin;
Expand Down Expand Up @@ -120,7 +120,7 @@ public List<String> load(String dbName) throws Exception {
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) {
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
MongoSchema schema = new MongoSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;

import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
Expand Down Expand Up @@ -64,7 +63,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {

private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
private final DrillbitContext context;
private final UserClientConnection connection; // is null if attached to non-root fragment
private final UserClientConnection connection; // is null if this context is for non-root fragment
private final QueryContext queryContext; // is null if this context is for non-root fragment
private final FragmentStats stats;
private final FunctionImplementationRegistry funcRegistry;
private final BufferAllocator allocator;
Expand All @@ -87,10 +87,34 @@ public void accept(final RpcException e) {
private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
private final AccountingUserConnection accountingUserConnection;

/**
* Create a FragmentContext instance for non-root fragment.
*
* @param dbContext DrillbitContext.
* @param fragment Fragment implementation.
* @param funcRegistry FunctionImplementationRegistry.
* @throws ExecutionSetupException
*/
public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
this(dbContext, fragment, null, null, funcRegistry);
}

/**
* Create a FragmentContext instance for root fragment.
*
* @param dbContext DrillbitContext.
* @param fragment Fragment implementation.
* @param queryContext QueryContext.
* @param connection UserClientConnection.
* @param funcRegistry FunctionImplementationRegistry.
* @throws ExecutionSetupException
*/
public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext,
final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
throws ExecutionSetupException {
this.context = dbContext;
this.queryContext = queryContext;
this.connection = connection;
this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler);
this.fragment = fragment;
Expand Down Expand Up @@ -128,6 +152,15 @@ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragm
bufferManager = new BufferManager(this.allocator, this);
}

/**
* TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
* the long list of test files.
*/
public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException {
this(dbContext, fragment, null, connection, funcRegistry);
}

public OptionManager getOptions() {
return fragmentOptions;
}
Expand Down Expand Up @@ -162,15 +195,13 @@ public DrillbitContext getDrillbitContext() {
}

public SchemaPlus getRootSchema() {
if (connection == null) {
if (queryContext == null) {
fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
"This is a non-root fragment."));
return null;
}

final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
return root;
return queryContext.getRootSchema();
}

/**
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.ops;

import java.io.IOException;
import java.util.Collection;

import io.netty.buffer.DrillBuf;
Expand Down Expand Up @@ -46,7 +47,7 @@
// TODO - consider re-name to PlanningContext, as the query execution context actually appears
// in fragment contexts
public class QueryContext implements AutoCloseable, UdfUtilities {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);

private final DrillbitContext drillbitContext;
private final UserSession session;
Expand Down Expand Up @@ -113,9 +114,15 @@ public SchemaPlus getNewDefaultSchema() {
}

public SchemaPlus getRootSchema() {
final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
return rootSchema;
try {
final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
return rootSchema;
} catch(IOException e) {
final String errMsg = String.format("Failed to create schema tree: %s", e.getMessage());
logger.error(errMsg, e);
throw new DrillRuntimeException(errMsg, e);
}
}

public OptionManager getOptions() {
Expand Down
Expand Up @@ -21,8 +21,10 @@

import org.apache.drill.exec.rpc.user.UserSession;

import java.io.IOException;

public interface SchemaFactory {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);

public void registerSchemas(UserSession session, SchemaPlus parent);
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException;
}
Expand Up @@ -301,7 +301,7 @@ public DrillSchemaFactory getSchemaFactory() {
public class DrillSchemaFactory implements SchemaFactory {

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) {
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
Stopwatch watch = new Stopwatch();
watch.start();

Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.util.List;
Expand All @@ -40,13 +41,13 @@
*/
public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {

public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs,
public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storagePluginConfig) {
this(name, context, fs, storagePluginConfig, new AvroFormatConfig());
this(name, context, fsConf, storagePluginConfig, new AvroFormatConfig());
}

public AvroFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
}

@Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand All @@ -36,34 +37,32 @@
public class BasicFormatMatcher extends FormatMatcher{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);

private final List<Pattern> patterns;
private final MagicStringMatcher matcher;
protected final DrillFileSystem fs;
protected final FormatPlugin plugin;
protected final boolean compressible;
protected final CompressionCodecFactory codecFactory;

public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
private final List<Pattern> patterns;
private final MagicStringMatcher matcher;

public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) {
super();
this.patterns = ImmutableList.copyOf(patterns);
this.matcher = new MagicStringMatcher(magicStrings);
this.fs = fs;
this.plugin = plugin;
this.compressible = false;
this.codecFactory = null;
}

public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible) {
public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) {
List<Pattern> patterns = Lists.newArrayList();
for (String extension : extensions) {
patterns.add(Pattern.compile(".*\\." + extension));
}
this.patterns = patterns;
this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
this.fs = fs;
this.plugin = plugin;
this.compressible = compressible;
this.codecFactory = new CompressionCodecFactory(fs.getConf());
this.codecFactory = new CompressionCodecFactory(fsConf);
}

@Override
Expand All @@ -72,8 +71,8 @@ public boolean supportDirectoryReads() {
}

@Override
public FormatSelection isReadable(FileSelection selection) throws IOException {
if (isReadable(selection.getFirstPath(fs))) {
public FormatSelection isReadable(DrillFileSystem fs, FileSelection selection) throws IOException {
if (isReadable(fs, selection.getFirstPath(fs))) {
if (plugin.getName() != null) {
NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
namedConfig.name = plugin.getName();
Expand All @@ -85,7 +84,7 @@ public FormatSelection isReadable(FileSelection selection) throws IOException {
return null;
}

protected final boolean isReadable(FileStatus status) throws IOException {
protected final boolean isReadable(DrillFileSystem fs, FileStatus status) throws IOException {
CompressionCodec codec = null;
if (compressible) {
codec = codecFactory.getCodec(status.getPath());
Expand All @@ -103,7 +102,7 @@ protected final boolean isReadable(FileStatus status) throws IOException {
}
}

if (matcher.matches(status)) {
if (matcher.matches(fs, status)) {
return true;
}
return false;
Expand All @@ -128,7 +127,7 @@ public MagicStringMatcher(List<MagicString> magicStrings) {
}
}

public boolean matches(FileStatus status) throws IOException{
public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{
if (ranges.isEmpty()) {
return false;
}
Expand Down
Expand Up @@ -96,11 +96,11 @@ public void run() {
private final OperatorStats operatorStats;

public DrillFileSystem(Configuration fsConf) throws IOException {
this(FileSystem.get(fsConf), null);
this(fsConf, null);
}

public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
this.underlyingFs = fs;
public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
this.underlyingFs = FileSystem.get(fsConf);
this.operatorStats = operatorStats;
}

Expand Down

0 comments on commit 117b749

Please sign in to comment.