Skip to content

Commit

Permalink
DRILL-5089: Dynamically load schema of storage plugin only when neede…
Browse files Browse the repository at this point in the history
…d for every query

For each query, loading all storage plugins and loading all workspaces under file system plugins is not needed.

This patch use DynamicRootSchema as the root schema for Drill. Which loads correspondent storage only when needed.

infoschema to read full schema information and load second level schema accordingly.

for workspaces under the same Filesyetm, no need to create FileSystem for each workspace.

use fs.access API to check permission which is available after HDFS 2.6 except for windows + local file system case.

Add unit tests to test with a broken mock storage: with a storage that will throw Exception in regiterSchema method,
all queries even on good storages shall fail without this fix(Drill still load all schemas from all storages).

This closes #1032
  • Loading branch information
chunhui-shi authored and parthchandra committed Nov 21, 2017
1 parent 5490f3d commit d4c61ca
Show file tree
Hide file tree
Showing 16 changed files with 526 additions and 47 deletions.
Expand Up @@ -230,7 +230,12 @@ public DrillbitContext getDrillbitContext() {
return context; return context;
} }


public SchemaPlus getRootSchema() { /**
* This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we
* are going to return a fully initialized schema tree.
* @return root schema's plus
*/
public SchemaPlus getFullRootSchema() {
if (queryContext == null) { if (queryContext == null) {
fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " + fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
"This is a non-root fragment.")); "This is a non-root fragment."));
Expand All @@ -248,7 +253,7 @@ public SchemaPlus getRootSchema() {
.setIgnoreAuthErrors(isImpersonationEnabled) .setIgnoreAuthErrors(isImpersonationEnabled)
.build(); .build();


return queryContext.getRootSchema(schemaConfig); return queryContext.getFullRootSchema(schemaConfig);
} }


/** /**
Expand Down
Expand Up @@ -163,14 +163,23 @@ public SchemaPlus getRootSchema(final String userName) {
} }


/** /**
* Create and return a SchemaTree with given <i>schemaConfig</i>. * Create and return a SchemaTree with given <i>schemaConfig</i> but some schemas (from storage plugins)
* could be initialized later.
* @param schemaConfig * @param schemaConfig
* @return * @return
*/ */
public SchemaPlus getRootSchema(SchemaConfig schemaConfig) { public SchemaPlus getRootSchema(SchemaConfig schemaConfig) {
return schemaTreeProvider.createRootSchema(schemaConfig); return schemaTreeProvider.createRootSchema(schemaConfig);
} }
/**
* Create and return a fully initialized SchemaTree with given <i>schemaConfig</i>.
* @param schemaConfig
* @return
*/


public SchemaPlus getFullRootSchema(SchemaConfig schemaConfig) {
return schemaTreeProvider.createFullRootSchema(schemaConfig);
}
/** /**
* Get the user name of the user who issued the query that is managed by this QueryContext. * Get the user name of the user who issued the query that is managed by this QueryContext.
* @return * @return
Expand Down
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql;

import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.calcite.DataContext;
import org.apache.calcite.jdbc.CalciteRootSchema;
import org.apache.calcite.jdbc.CalciteSchema;

import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Compatible;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.SubSchemaWrapper;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;

/**
* This class is to allow us loading schemas from storage plugins later when {@link #getSubSchema(String, boolean)}
* is called.
*/
public class DynamicRootSchema extends DynamicSchema
implements CalciteRootSchema {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicRootSchema.class);

protected SchemaConfig schemaConfig;
protected StoragePluginRegistry storages;

public StoragePluginRegistry getSchemaFactories() {
return storages;
}

/** Creates a root schema. */
DynamicRootSchema(StoragePluginRegistry storages, SchemaConfig schemaConfig) {
super(null, new RootSchema(), "");
this.schemaConfig = schemaConfig;
this.storages = storages;
}

@Override
public CalciteSchema getSubSchema(String schemaName, boolean caseSensitive) {
CalciteSchema retSchema = getSubSchemaMap().get(schemaName);
if (retSchema != null) {
return retSchema;
}

loadSchemaFactory(schemaName, caseSensitive);
retSchema = getSubSchemaMap().get(schemaName);
return retSchema;
}

@Override
public NavigableSet<String> getTableNames() {
return Compatible.INSTANCE.navigableSet(ImmutableSortedSet.<String>of());
}

/**
* load schema factory(storage plugin) for schemaName
* @param schemaName
* @param caseSensitive
*/
public void loadSchemaFactory(String schemaName, boolean caseSensitive) {
try {
SchemaPlus thisPlus = this.plus();
StoragePlugin plugin = getSchemaFactories().getPlugin(schemaName);
if (plugin != null) {
plugin.registerSchemas(schemaConfig, thisPlus);
return;
}

// Could not find the plugin of schemaName. The schemaName could be `dfs.tmp`, a 2nd level schema under 'dfs'
String[] paths = schemaName.split("\\.");
if (paths.length == 2) {
plugin = getSchemaFactories().getPlugin(paths[0]);
if (plugin == null) {
return;
}

// Found the storage plugin for first part(e.g. 'dfs') of schemaName (e.g. 'dfs.tmp')
// register schema for this storage plugin to 'this'.
plugin.registerSchemas(schemaConfig, thisPlus);

// Load second level schemas for this storage plugin
final SchemaPlus firstlevelSchema = thisPlus.getSubSchema(paths[0]);
final List<SchemaPlus> secondLevelSchemas = Lists.newArrayList();
for (String secondLevelSchemaName : firstlevelSchema.getSubSchemaNames()) {
secondLevelSchemas.add(firstlevelSchema.getSubSchema(secondLevelSchemaName));
}

for (SchemaPlus schema : secondLevelSchemas) {
org.apache.drill.exec.store.AbstractSchema drillSchema;
try {
drillSchema = schema.unwrap(org.apache.drill.exec.store.AbstractSchema.class);
} catch (ClassCastException e) {
throw new RuntimeException(String.format("Schema '%s' is not expected under root schema", schema.getName()));
}
SubSchemaWrapper wrapper = new SubSchemaWrapper(drillSchema);
thisPlus.add(wrapper.getName(), wrapper);
}
}
} catch(ExecutionSetupException | IOException ex) {
logger.warn("Failed to load schema for \"" + schemaName + "\"!", ex);
}
}

static class RootSchema extends AbstractSchema {
@Override public Expression getExpression(SchemaPlus parentSchema,
String name) {
return Expressions.call(
DataContext.ROOT,
BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
}
}
}

@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql;

import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginRegistry;


/**
* Unlike SimpleCalciteSchema, DynamicSchema could have an empty or partial schemaMap, but it could maintain a map of
* name->SchemaFactory, and only register schema when the corresponsdent name is requested.
*/
public class DynamicSchema extends SimpleCalciteSchema {

public DynamicSchema(CalciteSchema parent, Schema schema, String name) {
super(parent, schema, name);
}

@Override
public CalciteSchema getSubSchema(String schemaName, boolean caseSensitive) {
Schema s = schema.getSubSchema(schemaName);
if (s != null) {
return new DynamicSchema(this, s, schemaName);
}
CalciteSchema ret = getSubSchemaMap().get(schemaName);
return ret;
}

@Override
public SchemaPlus plus() {
return super.plus();
}

public static SchemaPlus createRootSchema(StoragePluginRegistry storages, SchemaConfig schemaConfig) {
DynamicRootSchema rootSchema = new DynamicRootSchema(storages, schemaConfig);
return rootSchema.plus();
}

}
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Strings; import com.google.common.base.Strings;
import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.CalciteSchemaImpl;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCluster;
Expand Down Expand Up @@ -117,9 +116,9 @@ public SqlConverter(QueryContext context) {
this.session = context.getSession(); this.session = context.getSession();
this.drillConfig = context.getConfig(); this.drillConfig = context.getConfig();
this.catalog = new DrillCalciteCatalogReader( this.catalog = new DrillCalciteCatalogReader(
this.rootSchema, rootSchema,
parserConfig.caseSensitive(), parserConfig.caseSensitive(),
CalciteSchemaImpl.from(defaultSchema).path(null), DynamicSchema.from(defaultSchema).path(null),
typeFactory, typeFactory,
drillConfig, drillConfig,
session); session);
Expand Down Expand Up @@ -297,7 +296,7 @@ public RelNode expandView(RelDataType rowType, String queryString, List<String>
@Override @Override
public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) { public RelNode expandView(RelDataType rowType, String queryString, SchemaPlus rootSchema, List<String> schemaPath) {
final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader( final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader(
rootSchema, // new root schema rootSchema,
parserConfig.caseSensitive(), parserConfig.caseSensitive(),
schemaPath, schemaPath,
typeFactory, typeFactory,
Expand Down Expand Up @@ -443,7 +442,7 @@ private class DrillCalciteCatalogReader extends CalciteCatalogReader {
JavaTypeFactory typeFactory, JavaTypeFactory typeFactory,
DrillConfig drillConfig, DrillConfig drillConfig,
UserSession session) { UserSession session) {
super(CalciteSchemaImpl.from(rootSchema), caseSensitive, defaultSchema, typeFactory); super(DynamicSchema.from(rootSchema), caseSensitive, defaultSchema, typeFactory);
this.drillConfig = drillConfig; this.drillConfig = drillConfig;
this.session = session; this.session = session;
this.allowTemporaryTables = true; this.allowTemporaryTables = true;
Expand Down
Expand Up @@ -20,12 +20,12 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;


import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.ViewExpansionContext; import org.apache.drill.exec.ops.ViewExpansionContext;
import org.apache.drill.exec.planner.sql.DynamicSchema;
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.server.options.OptionValue;
Expand Down Expand Up @@ -105,12 +105,36 @@ public SchemaPlus createRootSchema(final String userName, final SchemaConfigInfo
* @return * @return
*/ */
public SchemaPlus createRootSchema(SchemaConfig schemaConfig) { public SchemaPlus createRootSchema(SchemaConfig schemaConfig) {
final SchemaPlus rootSchema = DynamicSchema.createRootSchema(dContext.getStorage(), schemaConfig);
schemaTreesToClose.add(rootSchema);
return rootSchema;
}

/**
* Return full root schema with schema owner as the given user.
*
* @param userName Name of the user who is accessing the storage sources.
* @param provider {@link SchemaConfigInfoProvider} instance
* @return Root of the schema tree.
*/
public SchemaPlus createFullRootSchema(final String userName, final SchemaConfigInfoProvider provider) {
final String schemaUser = isImpersonationEnabled ? userName : ImpersonationUtil.getProcessUserName();
final SchemaConfig schemaConfig = SchemaConfig.newBuilder(schemaUser, provider).build();
return createFullRootSchema(schemaConfig);
}
/**
* Create and return a Full SchemaTree with given <i>schemaConfig</i>.
* @param schemaConfig
* @return
*/
public SchemaPlus createFullRootSchema(SchemaConfig schemaConfig) {
try { try {
final SchemaPlus rootSchema = SimpleCalciteSchema.createRootSchema(false); final SchemaPlus rootSchema = DynamicSchema.createRootSchema(dContext.getStorage(), schemaConfig);
dContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema); dContext.getSchemaFactory().registerSchemas(schemaConfig, rootSchema);
schemaTreesToClose.add(rootSchema); schemaTreesToClose.add(rootSchema);
return rootSchema; return rootSchema;
} catch(IOException e) { }
catch(IOException e) {
// We can't proceed further without a schema, throw a runtime exception. // We can't proceed further without a schema, throw a runtime exception.
// Improve the error message for client side. // Improve the error message for client side.


Expand All @@ -124,6 +148,7 @@ public SchemaPlus createRootSchema(SchemaConfig schemaConfig) {
.addContext(contextString) .addContext(contextString)
.build(logger); .build(logger);
} }

} }


@Override @Override
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.planner.logical.StoragePlugins; import org.apache.drill.exec.planner.logical.StoragePlugins;
import org.apache.drill.exec.planner.sql.DynamicSchema;
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.FormatPlugin;
Expand Down Expand Up @@ -494,4 +495,6 @@ public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlu
return availablePlugins; return availablePlugins;
} }




} }

0 comments on commit d4c61ca

Please sign in to comment.